diff --git a/flocks/cli/commands/task.py b/flocks/cli/commands/task.py index 4d7d848d..38cee268 100644 --- a/flocks/cli/commands/task.py +++ b/flocks/cli/commands/task.py @@ -96,12 +96,19 @@ async def _list_tasks(status_val, type_val, limit, fmt): scheduler_status = None if status_val == "running": scheduler_status = SchedulerStatus.ACTIVE - elif status_val == "paused": + elif status_val in ("paused", "disabled"): scheduler_status = SchedulerStatus.DISABLED tasks, total = await TaskManager.list_schedulers(status=scheduler_status, limit=limit) else: + task_status = None + if status_val: + mapped_status = "cancelled" if status_val == "paused" else status_val + try: + task_status = TaskStatus(mapped_status) + except ValueError as exc: + raise typer.BadParameter(f"Invalid execution status: {status_val}") from exc tasks, total = await TaskManager.list_executions( - status=TaskStatus(status_val) if status_val else None, + status=task_status, limit=limit, ) @@ -121,7 +128,7 @@ async def _list_tasks(status_val, type_val, limit, fmt): status_icon = { "pending": "⏳", "queued": "📋", "running": "🟢", "completed": "✅", "failed": "❌", "cancelled": "🚫", - "paused": "⏸️", "stopped": "🛑", + "disabled": "⏸️", "stopped": "🛑", } for t in tasks: icon = status_icon.get(t.status.value, "·") diff --git a/flocks/server/routes/task_entities.py b/flocks/server/routes/task_entities.py index 7c6126ed..13d7681d 100644 --- a/flocks/server/routes/task_entities.py +++ b/flocks/server/routes/task_entities.py @@ -1,6 +1,7 @@ """Execution-centric task scheduler/execution routes.""" -from typing import List, Optional +from enum import Enum +from typing import List, Optional, Type from fastapi import APIRouter, HTTPException, Query, status from pydantic import BaseModel, ConfigDict, Field @@ -66,6 +67,77 @@ class PaginatedResponse(BaseModel): limit: int +def _parse_enum( + value: Optional[str], + enum_cls: Type[Enum], + *, + label: str, + legacy_aliases: Optional[dict[str, object]] = None, +): + if not value: + return None + mapped = legacy_aliases.get(value, value) if legacy_aliases else value + try: + return enum_cls(mapped) + except ValueError as exc: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, + detail=f"Invalid {label}: {value}", + ) from exc + + +def _parse_scheduler_status_filter(status_filter: Optional[str]): + from flocks.task.models import SchedulerStatus + + return _parse_enum( + status_filter, + SchedulerStatus, + label="scheduler status", + legacy_aliases={ + "running": SchedulerStatus.ACTIVE, + "paused": SchedulerStatus.DISABLED, + }, + ) + + +def _parse_execution_status_filter(status_filter: Optional[str]): + from flocks.task.models import TaskStatus + + return _parse_enum( + status_filter, + TaskStatus, + label="execution status", + legacy_aliases={"paused": TaskStatus.CANCELLED}, + ) + + +def _parse_priority(priority: Optional[str]): + from flocks.task.models import TaskPriority + + return _parse_enum(priority, TaskPriority, label="task priority") + + +def _parse_delivery_status(delivery_status: Optional[str]): + from flocks.task.models import DeliveryStatus + + return _parse_enum(delivery_status, DeliveryStatus, label="delivery status") + + +def _parse_execution_mode(execution_mode: Optional[str]): + from flocks.task.models import ExecutionMode + + return _parse_enum(execution_mode, ExecutionMode, label="execution mode") + + +def _parse_task_type(task_type: str) -> str: + if task_type not in {"queued", "scheduled"}: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, + detail=f"Invalid task type: {task_type}", + ) + return task_type + + @router.get("/task-system/notice") async def get_task_system_notice(): from flocks.task.manager import TaskManager @@ -114,11 +186,10 @@ async def list_schedulers( limit: int = Query(20, ge=1, le=100), ): from flocks.task.manager import TaskManager - from flocks.task.models import SchedulerStatus, TaskPriority items, total = await TaskManager.list_schedulers( - status=SchedulerStatus(status_filter) if status_filter else None, - priority=TaskPriority(priority) if priority else None, + status=_parse_scheduler_status_filter(status_filter), + priority=_parse_priority(priority), scheduled_only=scheduled_only, sort_by=sort_by, sort_order=sort_order, @@ -137,43 +208,50 @@ async def list_schedulers( async def create_scheduler(req: SchedulerCreateRequest): from flocks.task.manager import TaskManager from flocks.task.models import ( - ExecutionMode, SchedulerMode, - TaskPriority, TaskSource, TaskTrigger, build_schedule, ) - if req.type == "queued": - trigger = TaskTrigger(runImmediately=True) - mode = SchedulerMode.ONCE - elif req.run_once: - trigger = build_schedule( - run_once=True, - run_at=req.run_at, - cron=req.cron, - cron_description=req.cron_description, - timezone=req.timezone, - ) - mode = SchedulerMode.ONCE - else: - trigger = build_schedule( - run_once=False, - cron=req.cron, - cron_description=req.cron_description, - timezone=req.timezone, - ) - mode = SchedulerMode.CRON + task_type = _parse_task_type(req.type) + priority = _parse_priority(req.priority) + execution_mode = _parse_execution_mode(req.execution_mode) + try: + if task_type == "queued": + trigger = TaskTrigger(runImmediately=True) + mode = SchedulerMode.ONCE + elif req.run_once: + trigger = build_schedule( + run_once=True, + run_at=req.run_at, + cron=req.cron, + cron_description=req.cron_description, + timezone=req.timezone, + ) + mode = SchedulerMode.ONCE + else: + trigger = build_schedule( + run_once=False, + cron=req.cron, + cron_description=req.cron_description, + timezone=req.timezone, + ) + mode = SchedulerMode.CRON + except ValueError as exc: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, + detail=str(exc), + ) from exc scheduler = await TaskManager.create_scheduler( title=req.title, description=req.description, mode=mode, - priority=TaskPriority(req.priority), + priority=priority, source=TaskSource(user_prompt=req.user_prompt) if req.user_prompt else None, trigger=trigger, - execution_mode=ExecutionMode(req.execution_mode), + execution_mode=execution_mode, agent_name=req.agent_name, workflow_id=req.workflow_id, skills=req.skills, @@ -198,29 +276,34 @@ async def get_scheduler(scheduler_id: str): @router.put("/task-schedulers/{scheduler_id}") async def update_scheduler(scheduler_id: str, req: SchedulerUpdateRequest): from flocks.task.manager import TaskManager - from flocks.task.models import ExecutionMode, TaskPriority fields = {k: v for k, v in req.model_dump(exclude_none=True).items()} if "priority" in fields: - fields["priority"] = TaskPriority(fields["priority"]) + fields["priority"] = _parse_priority(fields["priority"]) if "execution_mode" in fields: - fields["execution_mode"] = ExecutionMode(fields["execution_mode"]) + fields["execution_mode"] = _parse_execution_mode(fields["execution_mode"]) cron = fields.pop("cron", None) tz = fields.pop("timezone", None) cron_desc = fields.pop("cron_description", None) run_once = fields.pop("run_once", None) run_at = fields.pop("run_at", None) user_prompt = fields.pop("user_prompt", None) - scheduler = await TaskManager.update_scheduler_with_trigger( - scheduler_id, - fields=fields, - cron=cron, - timezone=tz, - cron_description=cron_desc, - run_once=run_once, - run_at=run_at, - user_prompt=user_prompt, - ) + try: + scheduler = await TaskManager.update_scheduler_with_trigger( + scheduler_id, + fields=fields, + cron=cron, + timezone=tz, + cron_description=cron_desc, + run_once=run_once, + run_at=run_at, + user_prompt=user_prompt, + ) + except ValueError as exc: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, + detail=str(exc), + ) from exc if not scheduler: raise HTTPException(404, "Task scheduler not found") return scheduler.model_dump(mode="json", by_alias=True) @@ -296,13 +379,12 @@ async def list_executions( limit: int = Query(20, ge=1, le=100), ): from flocks.task.manager import TaskManager - from flocks.task.models import DeliveryStatus, TaskPriority, TaskStatus items, total = await TaskManager.list_executions( scheduler_id=scheduler_id, - status=TaskStatus(status_filter) if status_filter else None, - priority=TaskPriority(priority) if priority else None, - delivery_status=DeliveryStatus(delivery_status) if delivery_status else None, + status=_parse_execution_status_filter(status_filter), + priority=_parse_priority(priority), + delivery_status=_parse_delivery_status(delivery_status), sort_by=sort_by, sort_order=sort_order, offset=offset, @@ -316,6 +398,20 @@ async def list_executions( ) +@router.post("/task-executions/batch/cancel") +async def batch_cancel(req: BatchRequest): + from flocks.task.manager import TaskManager + + return {"cancelled": await TaskManager.batch_cancel(req.execution_ids)} + + +@router.post("/task-executions/batch/delete") +async def batch_delete(req: BatchRequest): + from flocks.task.manager import TaskManager + + return {"deleted": await TaskManager.batch_delete(req.execution_ids)} + + @router.get("/task-executions/{execution_id}") async def get_execution(execution_id: str): from flocks.task.manager import TaskManager @@ -345,27 +441,6 @@ async def cancel_execution(execution_id: str): raise HTTPException(404, "Task execution not found") return execution.model_dump(mode="json", by_alias=True) - -@router.post("/task-executions/{execution_id}/pause") -async def pause_execution(execution_id: str): - from flocks.task.manager import TaskManager - - execution = await TaskManager.pause_execution(execution_id) - if not execution: - raise HTTPException(404, "Task execution not found") - return execution.model_dump(mode="json", by_alias=True) - - -@router.post("/task-executions/{execution_id}/resume") -async def resume_execution(execution_id: str): - from flocks.task.manager import TaskManager - - execution = await TaskManager.resume_execution(execution_id) - if not execution: - raise HTTPException(404, "Task execution not found") - return execution.model_dump(mode="json", by_alias=True) - - @router.post("/task-executions/{execution_id}/retry") async def retry_execution(execution_id: str): from flocks.task.manager import TaskManager @@ -395,17 +470,3 @@ async def delete_execution(execution_id: str): return {"ok": True} -@router.post("/task-executions/batch/cancel") -async def batch_cancel(req: BatchRequest): - from flocks.task.manager import TaskManager - - return {"cancelled": await TaskManager.batch_cancel(req.execution_ids)} - - -@router.post("/task-executions/batch/delete") -async def batch_delete(req: BatchRequest): - from flocks.task.manager import TaskManager - - return {"deleted": await TaskManager.batch_delete(req.execution_ids)} - - diff --git a/flocks/task/executor.py b/flocks/task/executor.py index 8989b0f9..a2f24206 100644 --- a/flocks/task/executor.py +++ b/flocks/task/executor.py @@ -44,13 +44,19 @@ async def dispatch( execution: TaskExecution, scheduler: TaskScheduler, ) -> TaskExecution: + # The execution row was already flipped to RUNNING atomically with the + # queue ref inside TaskStore.claim_next_queue_execution. We only need + # to make sure the in-memory object reflects that and to persist the + # session_id once the task session has been created. + if execution.status != TaskStatus.RUNNING: + execution.status = TaskStatus.RUNNING + if execution.started_at is None: + execution.started_at = datetime.now(timezone.utc) + session_id: Optional[str] = None if execution.execution_mode == ExecutionMode.AGENT: session_id = await cls._create_task_session(execution, scheduler) - started_at = datetime.now(timezone.utc) - execution.status = TaskStatus.RUNNING - execution.started_at = started_at execution.session_id = session_id await TaskStore.update_execution(execution) diff --git a/flocks/task/manager.py b/flocks/task/manager.py index 6edeaa15..f5dc7740 100644 --- a/flocks/task/manager.py +++ b/flocks/task/manager.py @@ -37,6 +37,7 @@ _CLEANUP_INTERVAL_S: int = 3600 _RETRY_CHECK_INTERVAL_S: int = 30 _STALE_RECOVERY_INTERVAL_S: int = 30 +_ORPHANED_QUEUE_RECOVERY_INTERVAL_S: int = 30 _DISPATCH_GUARD_TIMEOUT_S: int = _TASK_ABSOLUTE_TIMEOUT_S + 30 _RUNNING_RECOVERY_TIMEOUT_S: int = _DISPATCH_GUARD_TIMEOUT_S + 300 @@ -68,6 +69,7 @@ def __init__( self._running = False self._last_retry_check: float = 0.0 self._last_stale_recovery_check: float = 0.0 + self._last_orphaned_queue_recovery_check: float = 0.0 # ------------------------------------------------------------------ # Lifecycle @@ -94,6 +96,9 @@ async def start( recovered = await mgr._recover_orphaned_executions() if recovered: log.info("manager.orphan_recovery", {"count": recovered}) + recovered_queued = await mgr._recover_orphaned_queued_executions() + if recovered_queued: + log.info("manager.orphaned_queue_recovery", {"count": recovered_queued}) mgr._running = True mgr._loop_task = asyncio.create_task(mgr._execution_loop()) @@ -302,6 +307,7 @@ async def delete_scheduler(cls, scheduler_id: str) -> bool: scheduler = await TaskStore.get_scheduler(scheduler_id) if not scheduler: return False + await cls._cleanup_scheduler_active_executions(scheduler.id) if scheduler.dedup_key and scheduler.dedup_key.startswith("builtin:"): scheduler.status = SchedulerStatus.ARCHIVED await TaskStore.update_scheduler(scheduler) @@ -445,36 +451,6 @@ async def cancel_execution( await cls._publish_execution_update(execution) return execution - @classmethod - async def pause_execution( - cls, execution_id: str - ) -> Optional[TaskExecution]: - execution = await TaskStore.get_execution(execution_id) - if not execution or execution.status != TaskStatus.RUNNING: - return execution - execution.status = TaskStatus.PAUSED - execution = await TaskStore.update_execution(execution) - await cls._publish_execution_update(execution) - return execution - - @classmethod - async def resume_execution( - cls, execution_id: str - ) -> Optional[TaskExecution]: - execution = await TaskStore.get_execution(execution_id) - if not execution or execution.status != TaskStatus.PAUSED: - return execution - execution.status = TaskStatus.QUEUED - execution.queued_at = datetime.now(timezone.utc) - execution.started_at = None - execution.completed_at = None - execution.duration_ms = None - execution.error = None - execution.result_summary = None - execution.session_id = None - execution = await cls._enqueue_execution(execution) - return execution - @classmethod async def retry_execution( cls, execution_id: str @@ -501,7 +477,7 @@ async def rerun_execution( execution = await TaskStore.get_execution(execution_id) if not execution: return None - if execution.status in (TaskStatus.RUNNING, TaskStatus.QUEUED, TaskStatus.PAUSED): + if execution.status in (TaskStatus.RUNNING, TaskStatus.QUEUED): await cls.cancel_execution(execution_id) scheduler = await TaskStore.get_scheduler(execution.scheduler_id) if not scheduler: @@ -517,10 +493,20 @@ async def delete_execution(cls, execution_id: str) -> bool: execution = await TaskStore.get_execution(execution_id) if not execution: return False - if execution.status in (TaskStatus.RUNNING, TaskStatus.QUEUED, TaskStatus.PAUSED): + if execution.status in (TaskStatus.RUNNING, TaskStatus.QUEUED): await cls.cancel_execution(execution_id) return await TaskStore.delete_execution(execution_id) + @classmethod + async def _cleanup_scheduler_active_executions(cls, scheduler_id: str) -> int: + active_executions = await TaskStore.list_active_executions_for_scheduler(scheduler_id) + cleaned = 0 + for execution in active_executions: + cancelled = await cls.cancel_execution(execution.id) + if cancelled is not None: + cleaned += 1 + return cleaned + @classmethod async def batch_cancel(cls, execution_ids: List[str]) -> int: count = 0 @@ -646,6 +632,12 @@ async def _execution_loop(self) -> None: if now - self._last_stale_recovery_check >= _STALE_RECOVERY_INTERVAL_S: self._last_stale_recovery_check = now await self._recover_stale_active_executions() + if ( + now - self._last_orphaned_queue_recovery_check + >= _ORPHANED_QUEUE_RECOVERY_INTERVAL_S + ): + self._last_orphaned_queue_recovery_check = now + await self._recover_orphaned_queued_executions() execution = await self.queue.dequeue() if execution: task = asyncio.create_task(self._run_execution(execution)) @@ -670,11 +662,21 @@ async def _run_execution(self, execution: TaskExecution) -> None: execution.completed_at = datetime.now(timezone.utc) await TaskStore.update_execution(execution) return + should_finish_queue_ref = True try: execution = await asyncio.wait_for( TaskExecutor.dispatch(execution, scheduler), timeout=_DISPATCH_GUARD_TIMEOUT_S, ) + except asyncio.CancelledError: + current = await TaskStore.get_execution(execution.id) + if current is not None: + execution = current + if not execution.is_terminal: + execution = await self._requeue_execution(execution) + should_finish_queue_ref = False + await self._publish_execution_update(execution) + raise except asyncio.TimeoutError: await self._cancel_execution_runtime(execution) log.error( @@ -690,7 +692,8 @@ async def _run_execution(self, execution: TaskExecution) -> None: execution = await self._mark_execution_failed(execution, str(exc)) finally: self.queue.mark_finished(execution.id) - await TaskStore.finish_queue_ref(execution.id) + if should_finish_queue_ref: + await TaskStore.finish_queue_ref(execution.id) await self._publish_execution_update(execution) if execution.status == TaskStatus.FAILED: @@ -728,11 +731,13 @@ async def _recover_orphaned_executions(self) -> int: orphans = await TaskStore.list_executions_by_status(TaskStatus.RUNNING) await TaskStore.requeue_running_refs() for execution in orphans: - execution.status = TaskStatus.QUEUED - execution.started_at = None - execution.session_id = None - await TaskStore.update_execution(execution) - await TaskStore.enqueue_execution_ref(execution.id) + await self._requeue_execution(execution) + return len(orphans) + + async def _recover_orphaned_queued_executions(self) -> int: + orphans = await TaskStore.list_orphaned_queued_executions() + for execution in orphans: + await self._requeue_execution(execution) return len(orphans) async def _cleanup_loop(self) -> None: @@ -838,13 +843,20 @@ def _drop_legacy_tables(cls) -> None: @classmethod async def _enqueue_execution(cls, execution: TaskExecution) -> TaskExecution: + return await cls._requeue_execution(execution) + + @classmethod + async def _requeue_execution(cls, execution: TaskExecution) -> TaskExecution: execution.status = TaskStatus.QUEUED execution.queued_at = datetime.now(timezone.utc) execution.started_at = None execution.completed_at = None execution.duration_ms = None + execution.session_id = None + execution.result_summary = None + execution.error = None execution = await TaskStore.update_execution(execution) - await TaskStore.enqueue_execution_ref(execution.id) + await TaskStore.ensure_queued_execution_ref(execution.id) return execution @staticmethod diff --git a/flocks/task/models.py b/flocks/task/models.py index 0be41298..ec1ca983 100644 --- a/flocks/task/models.py +++ b/flocks/task/models.py @@ -22,7 +22,6 @@ class TaskStatus(str, Enum): COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" - PAUSED = "paused" class SchedulerStatus(str, Enum): diff --git a/flocks/task/store.py b/flocks/task/store.py index 9dcadf0b..6441c734 100644 --- a/flocks/task/store.py +++ b/flocks/task/store.py @@ -38,6 +38,7 @@ async def init(cls) -> None: for stmt in _INDEX_STMTS: await cls._conn.execute(stmt) await cls._conn.commit() + await cls._normalize_legacy_paused_executions() cls._initialized = True log.info("task.store.initialized") @@ -311,6 +312,24 @@ async def list_scheduler_executions( limit=limit, ) + @classmethod + async def list_active_executions_for_scheduler( + cls, scheduler_id: str + ) -> List[TaskExecution]: + db = await cls._db() + db.row_factory = aiosqlite.Row + async with db.execute( + """ + SELECT * FROM task_executions + WHERE scheduler_id = ? + AND status IN ('pending', 'queued', 'running') + ORDER BY created_at ASC + """, + (scheduler_id,), + ) as cur: + rows = await cur.fetchall() + return [cls._row_to_execution(row) for row in rows] + @classmethod async def update_execution(cls, execution: TaskExecution) -> TaskExecution: execution.touch() @@ -421,6 +440,43 @@ async def enqueue_execution_ref( await db.commit() return ref + @classmethod + async def ensure_queued_execution_ref( + cls, execution_id: str + ) -> TaskExecutionQueueRef: + active = await cls.get_queue_ref(execution_id) + db = await cls._db() + if active is None: + ref = TaskExecutionQueueRef(execution_id=execution_id) + await db.execute( + """ + INSERT INTO task_execution_queue_refs (id, execution_id, status, created_at, started_at) + VALUES (?, ?, ?, ?, ?) + """, + ( + ref.id, + ref.execution_id, + ref.status.value, + ref.created_at.isoformat(), + None, + ), + ) + await db.commit() + return ref + + await db.execute( + """ + UPDATE task_execution_queue_refs + SET status = 'queued', started_at = NULL + WHERE id = ? + """, + (active.id,), + ) + await db.commit() + active.status = TaskStatus.QUEUED + active.started_at = None + return active + @classmethod async def get_queue_ref( cls, execution_id: str @@ -449,7 +505,7 @@ async def get_active_execution_for_scheduler( """ SELECT * FROM task_executions WHERE scheduler_id = ? - AND status IN ('pending', 'queued', 'running', 'paused') + AND status IN ('pending', 'queued', 'running') ORDER BY created_at DESC LIMIT 1 """, @@ -458,6 +514,25 @@ async def get_active_execution_for_scheduler( row = await cur.fetchone() return cls._row_to_execution(row) if row else None + @classmethod + async def list_orphaned_queued_executions(cls) -> List[TaskExecution]: + db = await cls._db() + db.row_factory = aiosqlite.Row + async with db.execute( + """ + SELECT e.* + FROM task_executions e + LEFT JOIN task_execution_queue_refs q + ON q.execution_id = e.id + AND q.status IN ('queued', 'running') + WHERE e.status = 'queued' + AND q.id IS NULL + ORDER BY COALESCE(e.queued_at, e.created_at) ASC + """ + ) as cur: + rows = await cur.fetchall() + return [cls._row_to_execution(row) for row in rows] + @classmethod async def claim_next_queue_execution( cls, *, exclude_ids: Optional[List[str]] = None @@ -508,15 +583,43 @@ async def claim_next_queue_execution( ), ) claimed_at = datetime.now(timezone.utc) - await db.execute( - """ - UPDATE task_execution_queue_refs - SET status = 'running', started_at = ? - WHERE id = ? AND status = 'queued' - """, - (claimed_at.isoformat(), queue_ref.id), - ) - await db.commit() + claimed_iso = claimed_at.isoformat() + # Atomically flip both the queue ref and the execution row so the two + # tables can never be observed in an inconsistent state. If any step + # of the transaction fails we will roll back and leave the execution + # in `queued`, letting the next poll retry the claim. + try: + await db.execute("BEGIN IMMEDIATE") + cur = await db.execute( + """ + UPDATE task_execution_queue_refs + SET status = 'running', started_at = ? + WHERE id = ? AND status = 'queued' + """, + (claimed_iso, queue_ref.id), + ) + if cur.rowcount == 0: + await db.rollback() + return None + await db.execute( + """ + UPDATE task_executions + SET status = 'running', + started_at = COALESCE(started_at, ?), + updated_at = ? + WHERE id = ? + AND status IN ('pending', 'queued') + """, + (claimed_iso, claimed_iso, queue_ref.execution_id), + ) + await db.commit() + except Exception: + try: + await db.rollback() + except Exception: + pass + raise + queue_ref.status = TaskStatus.RUNNING queue_ref.started_at = claimed_at execution_data = dict(row) @@ -528,6 +631,12 @@ async def claim_next_queue_execution( "queue_ref_started_at", ): execution_data.pop(key, None) + # Reflect the atomic update in the in-memory snapshot so callers see + # the post-claim state (status=running, started_at=claimed_at). + execution_data["status"] = TaskStatus.RUNNING.value + if not execution_data.get("started_at"): + execution_data["started_at"] = claimed_iso + execution_data["updated_at"] = claimed_iso return cls._row_to_execution(execution_data), queue_ref @classmethod @@ -813,8 +922,53 @@ def _row_to_execution(cls, row: aiosqlite.Row | Dict[str, Any]) -> TaskExecution if data.get(col): data[col] = json.loads(data[col]) data.setdefault("execution_input_snapshot", {}) + cls._normalize_execution_row(data) return TaskExecution(**data) + @staticmethod + def _normalize_execution_row(data: Dict[str, Any]) -> None: + if data.get("status") != "paused": + return + data["status"] = TaskStatus.CANCELLED.value + data["completed_at"] = ( + data.get("completed_at") + or data.get("updated_at") + or data.get("created_at") + ) + if not data.get("error"): + data["error"] = "Normalized from legacy paused state." + + @classmethod + async def _normalize_legacy_paused_executions(cls) -> None: + db = await cls._db() + now = datetime.now(timezone.utc).isoformat() + await db.execute( + """ + DELETE FROM task_execution_queue_refs + WHERE status = 'paused' + OR execution_id IN ( + SELECT id FROM task_executions WHERE status = 'paused' + ) + """ + ) + await db.execute( + """ + UPDATE task_executions + SET status = ?, + completed_at = COALESCE(completed_at, updated_at, created_at, ?), + error = COALESCE(NULLIF(error, ''), ?), + updated_at = ? + WHERE status = 'paused' + """, + ( + TaskStatus.CANCELLED.value, + now, + "Normalized from legacy paused state.", + now, + ), + ) + await db.commit() + @classmethod def _row_to_queue_ref( cls, row: aiosqlite.Row | Dict[str, Any] diff --git a/flocks/tool/task/task_center.py b/flocks/tool/task/task_center.py index e168f967..a79cb2f7 100644 --- a/flocks/tool/task/task_center.py +++ b/flocks/tool/task/task_center.py @@ -5,6 +5,7 @@ create, list, update, delete, and query tasks via natural language. """ +import json from typing import Optional from flocks.tool.registry import ( @@ -20,6 +21,91 @@ log = Log.create(service="task.tools") +_TRUTHY_STRINGS = {"true", "1", "yes", "y", "on"} +_FALSY_STRINGS = {"false", "0", "no", "n", "off", ""} + + +def _coerce_legacy_bool(value: object, *, default: bool = False) -> bool: + """Coerce values that may arrive as strings from legacy clients.""" + if isinstance(value, bool): + return value + if isinstance(value, (int, float)): + return bool(value) + if isinstance(value, str): + normalized = value.strip().lower() + if normalized in _TRUTHY_STRINGS: + return True + if normalized in _FALSY_STRINGS: + return False + return default + + +def _normalize_task_create_inputs( + type_value: Optional[str], + schedule_type: Optional[str], + run_once: bool, + run_at: Optional[str], + cron: Optional[str], + cron_description: Optional[str], + timezone: str, + schedule: Optional[str], +) -> tuple[Optional[str], bool, Optional[str], Optional[str], Optional[str], str]: + """Accept common task_create aliases and infer scheduled tasks.""" + schedule_data: dict[str, object] = {} + if schedule: + normalized_schedule = schedule.strip() + if normalized_schedule.startswith("{"): + try: + parsed_schedule = json.loads(normalized_schedule) + except json.JSONDecodeError: + parsed_schedule = None + if isinstance(parsed_schedule, dict): + schedule_data = parsed_schedule + else: + cron = cron or schedule + else: + cron = cron or schedule + + if schedule_data: + type_value = type_value or schedule_data.get("type") or schedule_data.get("task_type") + schedule_type = ( + schedule_type + or schedule_data.get("schedule_type") + or schedule_data.get("scheduleType") + ) + cron = cron or schedule_data.get("cron") + run_at = run_at or schedule_data.get("run_at") or schedule_data.get("runAt") + cron_description = ( + cron_description + or schedule_data.get("cron_description") + or schedule_data.get("cronDescription") + ) + timezone = str(schedule_data.get("timezone") or timezone) + if schedule_data.get("run_once") is not None: + run_once = _coerce_legacy_bool(schedule_data.get("run_once"), default=run_once) + elif schedule_data.get("runOnce") is not None: + run_once = _coerce_legacy_bool(schedule_data.get("runOnce"), default=run_once) + + if type_value: + return type_value, run_once, run_at, cron, cron_description, timezone + if not schedule_type: + # When the caller signalled a scheduled intent (run_once / run_at / cron), + # keep it scheduled so build_schedule can surface proper validation errors + # instead of silently falling back to an immediate queued execution. + if cron or run_at or run_once: + return "scheduled", run_once, run_at, cron, cron_description, timezone + return "queued", run_once, run_at, cron, cron_description, timezone + + normalized = schedule_type.strip().lower() + if normalized == "queued": + return "queued", run_once, run_at, cron, cron_description, timezone + if normalized in {"scheduled", "cron", "recurring", "repeat"}: + return "scheduled", False, run_at, cron, cron_description, timezone + if normalized in {"once", "one_time", "one-time", "run_once"}: + return "scheduled", True, run_at, cron, cron_description, timezone + return schedule_type, run_once, run_at, cron, cron_description, timezone + + # ====================================================================== # task_create # ====================================================================== @@ -76,9 +162,28 @@ "'scheduled' = triggered at a specific time (one-time or recurring, " "controlled by run_once)" ), - required=True, + required=False, enum=["queued", "scheduled"], ), + ToolParameter( + name="schedule_type", + type=ParameterType.STRING, + description=( + "Legacy alias for task schedule kind. " + "Accepted values include 'queued', 'scheduled', 'cron', " + "'once', 'one_time'. Prefer using type + run_once." + ), + required=False, + ), + ToolParameter( + name="schedule", + type=ParameterType.STRING, + description=( + "Legacy schedule alias. Can be a cron string like '*/5 * * * *' " + "or a JSON string containing cron/runAt/runOnce/timezone." + ), + required=False, + ), ToolParameter( name="run_once", type=ParameterType.BOOLEAN, @@ -159,13 +264,33 @@ ), required=False, ), + ToolParameter( + name="enabled", + type=ParameterType.BOOLEAN, + description=( + "Legacy compatibility field. False creates the task and then disables it. " + "True keeps it active." + ), + required=False, + ), + ToolParameter( + name="action", + type=ParameterType.STRING, + description=( + "Legacy compatibility field sometimes sent by models during task creation. " + "Ignored by task_create." + ), + required=False, + ), ], ) async def task_create( ctx: ToolContext, title: str, description: str, - type: str, + type: Optional[str] = None, + schedule_type: Optional[str] = None, + schedule: Optional[str] = None, run_once: bool = False, priority: str = "normal", run_at: Optional[str] = None, @@ -173,6 +298,8 @@ async def task_create( cron_description: Optional[str] = None, timezone: str = "Asia/Shanghai", user_prompt: Optional[str] = None, + enabled: Optional[bool] = None, + action: Optional[str] = None, ) -> ToolResult: from flocks.task.manager import TaskManager from flocks.task.models import ( @@ -183,6 +310,24 @@ async def task_create( build_schedule, ) + del action + + type, run_once, run_at, cron, cron_description, timezone = _normalize_task_create_inputs( + type, + schedule_type, + run_once, + run_at, + cron, + cron_description, + timezone, + schedule, + ) + if type is None: + return ToolResult( + success=False, + error="type or schedule_type is required", + ) + task_priority = TaskPriority(priority) if type == "queued": @@ -214,6 +359,8 @@ async def task_create( source=source, trigger=trigger, ) + if enabled is False: + scheduler = await TaskManager.disable_scheduler(scheduler.id) or scheduler output_lines = [ f"ID: {scheduler.id}", @@ -262,8 +409,9 @@ async def task_create( "Routing rules (IMPORTANT - read before calling):\n" "- No parameters -> lists scheduled task definitions (schedulers).\n" "- status='active' -> lists active scheduled tasks.\n" - "- status='paused' or 'disabled' -> lists paused or disabled scheduled tasks " - "unless type='execution' or type='queued' is explicitly provided.\n" + "- status='disabled' -> lists disabled scheduled tasks.\n" + "- legacy status='paused' is still accepted and mapped to disabled schedulers " + "or cancelled executions depending on query target.\n" "- status='running' / 'completed' / 'failed' / 'pending' / 'queued' / 'cancelled' " "-> lists task executions with that status.\n" "- type='scheduled' -> forces listing schedulers.\n" @@ -274,8 +422,7 @@ async def task_create( "-> call with no parameters.\n" "- 'How many tasks are currently running?' -> call with status='running'.\n" "- 'Which scheduled tasks are disabled?' -> call with status='disabled'.\n" - "- 'Which paused executions are waiting to resume?' -> call with " - "type='execution', status='paused'." + "- 'Show old paused tasks' -> call with status='paused'." ), category=ToolCategory.SYSTEM, parameters=[ @@ -284,10 +431,9 @@ async def task_create( type=ParameterType.STRING, description=( "Filter by status. " - "Scheduler statuses: 'active', 'disabled', 'paused'. " + "Scheduler statuses: 'active', 'disabled'. " "Execution statuses: 'pending', 'queued', 'running', 'completed', " - "'failed', 'cancelled', 'paused'. " - "If type is omitted, 'paused' defaults to scheduled tasks." + "'failed', 'cancelled'. Legacy alias: 'paused'." ), required=False, enum=[ @@ -368,7 +514,7 @@ async def task_list( scheduler_status = None if status == "active": scheduler_status = SchedulerStatus.ACTIVE - elif status in ("paused", "disabled"): + elif status in ("disabled", "paused"): scheduler_status = SchedulerStatus.DISABLED tasks, total = await TaskManager.list_schedulers( status=scheduler_status, @@ -378,7 +524,8 @@ async def task_list( label = "Scheduled tasks" else: try: - task_status = TaskStatus(status) if status else None + mapped_status = "cancelled" if status == "paused" else status + task_status = TaskStatus(mapped_status) if mapped_status else None except ValueError: return ToolResult( success=False, @@ -441,7 +588,29 @@ async def task_status(ctx: ToolContext, task_id: str) -> ToolResult: @ToolRegistry.register_function( name="task_update", - description="Update a task (priority, status, title). Supports cancel/pause/resume.", + description=( + "Update a task. By default action=update, which can modify scheduler " + "fields like title, description, priority, cron, run_once, run_at, " + "cron_description, timezone, and user_prompt. Supports enable/disable " + "for scheduled tasks, and cancel/retry " + "for execution tasks.\n\n" + "IMPORTANT:\n" + "- Pass update fields as top-level arguments. DO NOT wrap them inside " + "a `fields` object or JSON string.\n" + "- To stop a scheduled task, use action='disable', 'pause', or 'stop'. " + "To resume it, use action='enable', 'resume', or 'start'.\n" + "- When changing a schedule, also pass a human-readable `title` and " + "`cron_description` that reflect the new schedule, otherwise the task " + "title shown in the UI may remain the old wording.\n\n" + "Good example for recurring schedule update:\n" + "task_update(task_id='tsk_xxx', cron='*/10 * * * *', " + "title='每10分钟执行关键词搜索摘要生成工作流', " + "cron_description='每10分钟执行一次')\n" + "Good example for stopping a scheduled task:\n" + "task_update(task_id='tsk_xxx', action='disable')\n" + "Bad example:\n" + "task_update(task_id='tsk_xxx', fields='{\"cron\":\"*/10 * * * *\"}')" + ), category=ToolCategory.SYSTEM, parameters=[ ToolParameter( @@ -454,8 +623,12 @@ async def task_status(ctx: ToolContext, task_id: str) -> ToolResult: name="action", type=ParameterType.STRING, description="Action to perform", - required=True, - enum=["cancel", "pause", "resume", "retry", "update"], + required=False, + default="update", + enum=[ + "cancel", "retry", "update", + "disable", "enable", "pause", "resume", "stop", "start", + ], ), ToolParameter( name="priority", @@ -467,7 +640,67 @@ async def task_status(ctx: ToolContext, task_id: str) -> ToolResult: ToolParameter( name="title", type=ParameterType.STRING, - description="New title (only for action=update)", + description=( + "New title (only for action=update). When changing cron/run_at, " + "also update title so the UI wording matches the new schedule." + ), + required=False, + ), + ToolParameter( + name="description", + type=ParameterType.STRING, + description="New description (only for action=update)", + required=False, + ), + ToolParameter( + name="run_once", + type=ParameterType.BOOLEAN, + description="Update one-time vs recurring schedule", + required=False, + ), + ToolParameter( + name="run_at", + type=ParameterType.STRING, + description="ISO 8601 datetime for one-time scheduled execution", + required=False, + ), + ToolParameter( + name="cron", + type=ParameterType.STRING, + description=( + "Cron expression for recurring scheduled execution. Pass this as " + "a top-level argument, not inside a `fields` wrapper." + ), + required=False, + ), + ToolParameter( + name="cron_description", + type=ParameterType.STRING, + description=( + "Human-readable Chinese schedule description shown in UI. " + "When changing schedule, provide this together with title." + ), + required=False, + ), + ToolParameter( + name="timezone", + type=ParameterType.STRING, + description="Timezone for scheduled tasks", + required=False, + ), + ToolParameter( + name="user_prompt", + type=ParameterType.STRING, + description="Execution prompt stored with the scheduler", + required=False, + ), + ToolParameter( + name="enabled", + type=ParameterType.BOOLEAN, + description=( + "Enable or disable a scheduled task. False stops it; True resumes it. " + "Can be used with action=update as a compatibility shortcut." + ), required=False, ), ], @@ -475,28 +708,62 @@ async def task_status(ctx: ToolContext, task_id: str) -> ToolResult: async def task_update( ctx: ToolContext, task_id: str, - action: str, + action: str = "update", priority: Optional[str] = None, title: Optional[str] = None, + description: Optional[str] = None, + run_once: Optional[bool] = None, + run_at: Optional[str] = None, + cron: Optional[str] = None, + cron_description: Optional[str] = None, + timezone: Optional[str] = None, + user_prompt: Optional[str] = None, + enabled: Optional[bool] = None, ) -> ToolResult: from flocks.task.manager import TaskManager from flocks.task.models import TaskPriority - if action == "cancel": + normalized_action = (action or "update").lower() + if normalized_action in {"pause", "stop"}: + normalized_action = "disable" + elif normalized_action in {"resume", "start"}: + normalized_action = "enable" + + if normalized_action == "cancel": task = await TaskManager.cancel_execution(task_id) - elif action == "pause": - task = await TaskManager.pause_execution(task_id) - elif action == "resume": - task = await TaskManager.resume_execution(task_id) - elif action == "retry": + elif normalized_action == "retry": task = await TaskManager.retry_execution(task_id) - elif action == "update": + elif normalized_action == "disable": + task = await TaskManager.disable_scheduler(task_id) + elif normalized_action == "enable": + task = await TaskManager.enable_scheduler(task_id) + elif normalized_action == "update": fields = {} if priority: fields["priority"] = TaskPriority(priority) if title: fields["title"] = title - task = await TaskManager.update_scheduler(task_id, **fields) + if description is not None: + fields["description"] = description + try: + task = await TaskManager.update_scheduler_with_trigger( + task_id, + fields=fields, + cron=cron, + timezone=timezone, + cron_description=cron_description, + run_once=run_once, + run_at=run_at, + user_prompt=user_prompt, + ) + except ValueError as exc: + return ToolResult(success=False, error=str(exc)) + if enabled is False: + task = await TaskManager.disable_scheduler(task_id) or task + normalized_action = "disable" + elif enabled is True: + task = await TaskManager.enable_scheduler(task_id) or task + normalized_action = "enable" else: return ToolResult(success=False, error=f"Unknown action: {action}") @@ -506,7 +773,7 @@ async def task_update( return ToolResult( success=True, output=_format_task(task), - title=f"Task {action}d: {task.title}", + title=f"Task {normalized_action}d: {task.title}", ) @@ -546,7 +813,7 @@ async def task_delete(ctx: ToolContext, task_id: str) -> ToolResult: @ToolRegistry.register_function( name="task_rerun", - description="Rerun a task. If the task is currently running, it will be stopped and requeued.", + description="Rerun a task. If it is active, it will be cancelled and a new execution will be created.", category=ToolCategory.SYSTEM, parameters=[ ToolParameter( @@ -584,7 +851,6 @@ async def task_rerun(ctx: ToolContext, task_id: str) -> ToolResult: "completed": "✅", "failed": "❌", "cancelled": "🚫", - "paused": "⏸️", } diff --git a/tests/integration/test_task_queue_integration.py b/tests/integration/test_task_queue_integration.py index f3107089..bde3ab7b 100644 --- a/tests/integration/test_task_queue_integration.py +++ b/tests/integration/test_task_queue_integration.py @@ -14,7 +14,13 @@ from flocks.storage.storage import Storage from flocks.task.executor import TaskExecutor from flocks.task.manager import TaskManager -from flocks.task.models import ExecutionTriggerType, SchedulerMode, TaskStatus, TaskTrigger +from flocks.task.models import ( + ExecutionTriggerType, + SchedulerMode, + SchedulerStatus, + TaskStatus, + TaskTrigger, +) from flocks.task.store import TaskStore @@ -171,6 +177,101 @@ async def fake_dispatch(execution, scheduler): assert completed.result_summary == "后续任务 done" +@pytest.mark.asyncio +async def test_delete_scheduler_releases_claimed_queue_slot(tmp_path: Path): + await TaskManager.start(max_concurrent=1, poll_interval=999, scheduler_interval=999) + + scheduler = await TaskManager.create_scheduler( + title="删除释放槽位", + mode=SchedulerMode.ONCE, + trigger=TaskTrigger(run_immediately=False), + workspace_directory=str(tmp_path / "workspace-1"), + ) + execution = await TaskManager.create_execution_from_scheduler( + scheduler, + trigger_type=ExecutionTriggerType.RUN_ONCE, + enqueue=True, + ) + + manager = TaskManager.get() + assert manager is not None + + claimed = await manager.queue.dequeue() + + assert claimed is not None + assert claimed.id == execution.id + assert execution.id in manager.queue._running_ids + + deleted = await TaskManager.delete_scheduler(scheduler.id) + + assert deleted is True + assert execution.id not in manager.queue._running_ids + assert await TaskManager.get_execution(execution.id) is None + assert await TaskStore.get_queue_ref(execution.id) is None + + next_scheduler = await TaskManager.create_scheduler( + title="后续可领取", + mode=SchedulerMode.ONCE, + trigger=TaskTrigger(run_immediately=False), + workspace_directory=str(tmp_path / "workspace-2"), + ) + next_execution = await TaskManager.create_execution_from_scheduler( + next_scheduler, + trigger_type=ExecutionTriggerType.RUN_ONCE, + enqueue=True, + ) + + next_claimed = await manager.queue.dequeue() + + assert next_claimed is not None + assert next_claimed.id == next_execution.id + + +@pytest.mark.asyncio +async def test_cancelled_worker_requeues_execution_instead_of_losing_queue_ref( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +): + async def fake_dispatch(_execution, _scheduler): + raise asyncio.CancelledError() + + monkeypatch.setattr(TaskExecutor, "dispatch", fake_dispatch) + await TaskManager.start(max_concurrent=1, poll_interval=999, scheduler_interval=999) + + scheduler = await TaskManager.create_scheduler( + title="取消后回队", + mode=SchedulerMode.ONCE, + trigger=TaskTrigger(run_immediately=False), + workspace_directory=str(tmp_path / "workspace"), + ) + execution = await TaskManager.create_execution_from_scheduler( + scheduler, + trigger_type=ExecutionTriggerType.RUN_ONCE, + enqueue=True, + ) + + manager = TaskManager.get() + assert manager is not None + + claimed = await manager.queue.dequeue() + assert claimed is not None + assert claimed.id == execution.id + + with pytest.raises(asyncio.CancelledError): + await manager._run_execution(claimed) + + refreshed = await TaskManager.get_execution(execution.id) + queue_ref = await TaskStore.get_queue_ref(execution.id) + + assert refreshed is not None + assert refreshed.status == TaskStatus.QUEUED + assert refreshed.started_at is None + assert refreshed.completed_at is None + assert refreshed.session_id is None + assert queue_ref is not None + assert queue_ref.status == TaskStatus.QUEUED + + @pytest.mark.asyncio async def test_workflow_timeout_signals_cancel_and_stops_before_next_node( monkeypatch: pytest.MonkeyPatch, @@ -376,3 +477,116 @@ async def test_standalone_legacy_migration_script_migrates_existing_tables(tmp_p assert execution.session_id == "ses_123" assert TaskManager._legacy_tables_exist() is False assert state_path.exists() is False + + +@pytest.mark.asyncio +async def test_standalone_legacy_migration_preserves_paused_scheduled_task_history( + tmp_path: Path, +): + db = await TaskStore.raw_db() + now = datetime.now(timezone.utc) + created_at = (now - timedelta(minutes=5)).isoformat() + + await db.executescript( + """ + CREATE TABLE tasks ( + id TEXT PRIMARY KEY, + title TEXT, + description TEXT, + type TEXT, + status TEXT, + priority TEXT, + source TEXT, + schedule TEXT, + execution_mode TEXT, + agent_name TEXT, + workflow_id TEXT, + skills TEXT, + category TEXT, + context TEXT, + workspace_directory TEXT, + retry TEXT, + tags TEXT, + created_at TEXT, + updated_at TEXT, + created_by TEXT, + dedup_key TEXT, + delivery_status TEXT, + execution TEXT + ); + CREATE TABLE task_execution_records ( + id TEXT PRIMARY KEY, + task_id TEXT NOT NULL, + status TEXT, + delivery_status TEXT, + started_at TEXT, + completed_at TEXT, + duration_ms INTEGER, + session_id TEXT, + result_summary TEXT, + error TEXT + ); + CREATE TABLE task_queue_refs ( + id TEXT PRIMARY KEY, + task_id TEXT NOT NULL, + execution_record_id TEXT, + status TEXT, + created_at TEXT, + started_at TEXT + ); + """ + ) + await db.execute( + """ + INSERT INTO tasks ( + id, title, description, type, status, priority, source, schedule, + execution_mode, agent_name, context, workspace_directory, retry, + tags, created_at, updated_at, created_by + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + "task_paused_sched", + "旧 paused 定时任务", + "legacy paused scheduled task", + "scheduled", + "paused", + "normal", + json.dumps({"sourceType": "scheduled_trigger"}), + json.dumps({"runOnce": True, "runAt": created_at, "enabled": True}), + "agent", + "rex", + json.dumps({}), + str(tmp_path / "workspace"), + json.dumps({"maxRetries": 0, "retryDelaySeconds": 60, "retryCount": 0}), + json.dumps([]), + created_at, + created_at, + "migration", + ), + ) + await db.commit() + + script_path = Path(__file__).resolve().parents[2] / "scripts" / "migrate_legacy_task_tables.py" + state_path = tmp_path / "task_migration_state.json" + proc = await asyncio.create_subprocess_exec( + sys.executable, + str(script_path), + "--db", + str(Storage.get_db_path()), + "--state-file", + str(state_path), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await proc.communicate() + assert proc.returncode == 0, (stdout or b"").decode() + (stderr or b"").decode() + + scheduler = await TaskManager.get_scheduler("task_paused_sched") + execution = await TaskManager.get_execution("legacy_exec_task_paused_sched") + + assert scheduler is not None + assert scheduler.status in (SchedulerStatus.ACTIVE, SchedulerStatus.DISABLED) + assert execution is not None + assert execution.scheduler_id == "task_paused_sched" + assert execution.status == TaskStatus.CANCELLED + assert execution.error == "Normalized from legacy paused state." diff --git a/tests/server/test_server.py b/tests/server/test_server.py index 44fe5df4..6bd56b14 100644 --- a/tests/server/test_server.py +++ b/tests/server/test_server.py @@ -2,13 +2,23 @@ Tests for server module """ +from datetime import datetime, timezone + import pytest from httpx import AsyncClient, ASGITransport from fastapi import status from flocks.server.app import app from flocks.task.manager import TaskManager -from flocks.task.models import DeliveryStatus, SchedulerMode, TaskStatus, TaskTrigger +from flocks.task.store import TaskStore +from flocks.task.models import ( + DeliveryStatus, + ExecutionTriggerType, + SchedulerMode, + SchedulerStatus, + TaskStatus, + TaskTrigger, +) @pytest.fixture @@ -82,6 +92,29 @@ async def test_queue_items_endpoint(client): assert "total" in data +@pytest.mark.asyncio +async def test_create_scheduled_task_missing_cron_returns_422(client): + response = await client.post( + "/api/task-schedulers", + json={ + "title": "缺少 cron 的定时任务", + "type": "scheduled", + "runOnce": False, + }, + ) + + assert response.status_code == status.HTTP_422_UNPROCESSABLE_CONTENT + assert response.json()["message"] == "cron is required for recurring scheduled tasks" + + +@pytest.mark.asyncio +async def test_list_executions_invalid_priority_returns_422(client): + response = await client.get("/api/task-executions", params={"priority": "bad"}) + + assert response.status_code == status.HTTP_422_UNPROCESSABLE_CONTENT + assert response.json()["message"] == "Invalid task priority: bad" + + @pytest.mark.asyncio async def test_task_schedulers_scheduled_only_excludes_immediate_queue_templates(client): await TaskManager.create_scheduler( @@ -104,6 +137,22 @@ async def test_task_schedulers_scheduled_only_excludes_immediate_queue_templates assert len(ids) == 1 +@pytest.mark.asyncio +async def test_task_scheduler_list_accepts_legacy_paused_status_query(client): + scheduler = await TaskManager.create_scheduler( + title="兼容旧 paused 调度查询", + mode=SchedulerMode.ONCE, + trigger=TaskTrigger(run_immediately=False), + ) + await TaskManager.disable_scheduler(scheduler.id) + + response = await client.get("/api/task-schedulers", params={"status": "paused"}) + + assert response.status_code == status.HTTP_200_OK + ids = {item["id"] for item in response.json()["items"]} + assert scheduler.id in ids + + @pytest.mark.asyncio async def test_task_schedulers_list_excludes_archived_builtin_after_delete(client): scheduler = await TaskManager.create_scheduler( @@ -112,6 +161,11 @@ async def test_task_schedulers_list_excludes_archived_builtin_after_delete(clien trigger=TaskTrigger(cron="*/5 * * * *", timezone="Asia/Shanghai"), dedup_key="builtin:test-scheduled-task", ) + execution = await TaskManager.create_execution_from_scheduler( + scheduler, + trigger_type=ExecutionTriggerType.SCHEDULED, + enqueue=True, + ) response = await client.delete(f"/api/task-schedulers/{scheduler.id}") assert response.status_code == status.HTTP_200_OK @@ -122,12 +176,125 @@ async def test_task_schedulers_list_excludes_archived_builtin_after_delete(clien ids = {item["id"] for item in data["items"]} assert scheduler.id not in ids + archived = await TaskManager.get_scheduler(scheduler.id) + cancelled_execution = await TaskManager.get_execution(execution.id) + assert archived is not None + assert archived.status == SchedulerStatus.ARCHIVED + assert cancelled_execution is not None + assert cancelled_execution.status == TaskStatus.CANCELLED @pytest.mark.asyncio -async def test_mark_execution_viewed_endpoint_updates_delivery_status(client): - from flocks.task.store import TaskStore +async def test_delete_scheduler_cleans_queue_state_for_non_builtin(client): + await TaskManager.start(max_concurrent=1, poll_interval=999, scheduler_interval=999) + scheduler = await TaskManager.create_scheduler( + title="普通计划任务", + mode=SchedulerMode.ONCE, + trigger=TaskTrigger(run_immediately=False), + ) + execution = await TaskManager.create_execution_from_scheduler( + scheduler, + trigger_type=ExecutionTriggerType.RUN_ONCE, + enqueue=True, + ) + + before = await client.get("/api/task-system/queue/status") + assert before.status_code == status.HTTP_200_OK + assert before.json()["queued"] + before.json()["running"] >= 1 + + response = await client.delete(f"/api/task-schedulers/{scheduler.id}") + assert response.status_code == status.HTTP_200_OK + + after = await client.get("/api/task-system/queue/status") + assert after.status_code == status.HTTP_200_OK + assert after.json()["queued"] == 0 + + executions = await client.get("/api/task-executions") + assert executions.status_code == status.HTTP_200_OK + execution_ids = {item["id"] for item in executions.json()["items"]} + + assert execution.id not in execution_ids + + +@pytest.mark.asyncio +async def test_task_execution_list_accepts_legacy_paused_status_query(client): + scheduler = await TaskManager.create_scheduler( + title="兼容旧 paused 执行查询", + mode=SchedulerMode.ONCE, + trigger=TaskTrigger(run_immediately=False), + ) + execution = await TaskManager.create_execution_from_scheduler( + scheduler, + trigger_type=ExecutionTriggerType.RUN_ONCE, + enqueue=False, + ) + execution.status = TaskStatus.CANCELLED + execution.completed_at = datetime.now(timezone.utc) + await TaskStore.update_execution(execution) + + response = await client.get("/api/task-executions", params={"status": "paused"}) + + assert response.status_code == status.HTTP_200_OK + ids = {item["id"] for item in response.json()["items"]} + assert execution.id in ids + + +@pytest.mark.asyncio +async def test_batch_cancel_endpoint_cancels_selected_executions(client): + scheduler = await TaskManager.create_scheduler( + title="批量取消接口", + mode=SchedulerMode.ONCE, + trigger=TaskTrigger(run_immediately=False), + ) + cancellable = await TaskManager.create_execution_from_scheduler( + scheduler, + trigger_type=ExecutionTriggerType.RUN_ONCE, + enqueue=True, + ) + completed = await TaskManager.create_execution_from_scheduler( + scheduler, + trigger_type=ExecutionTriggerType.RUN_ONCE, + enqueue=False, + ) + completed.status = TaskStatus.COMPLETED + completed.completed_at = datetime.now(timezone.utc) + await TaskStore.update_execution(completed) + response = await client.post( + "/api/task-executions/batch/cancel", + json={"executionIds": [cancellable.id, completed.id]}, + ) + + assert response.status_code == status.HTTP_200_OK + assert response.json()["cancelled"] == 1 + + cancelled_execution = await TaskManager.get_execution(cancellable.id) + assert cancelled_execution is not None + assert cancelled_execution.status == TaskStatus.CANCELLED + + +@pytest.mark.asyncio +async def test_execution_pause_and_resume_endpoints_are_removed(client): + scheduler = await TaskManager.create_scheduler( + title="旧暂停接口", + mode=SchedulerMode.ONCE, + trigger=TaskTrigger(run_immediately=False), + ) + execution = await TaskManager.create_execution_from_scheduler( + scheduler, + trigger_type=ExecutionTriggerType.RUN_ONCE, + enqueue=True, + ) + + pause_response = await client.post(f"/api/task-executions/{execution.id}/pause") + resume_response = await client.post(f"/api/task-executions/{execution.id}/resume") + + assert pause_response.status_code == status.HTTP_404_NOT_FOUND + assert resume_response.status_code == status.HTTP_404_NOT_FOUND + + +@pytest.mark.asyncio +async def test_mark_execution_viewed_endpoint_updates_delivery_status(client): scheduler = await TaskManager.create_scheduler( title="标记已读", mode=SchedulerMode.ONCE, diff --git a/tests/task/test_task.py b/tests/task/test_task.py index ad5a6992..d0e49040 100644 --- a/tests/task/test_task.py +++ b/tests/task/test_task.py @@ -7,6 +7,7 @@ import pytest +from flocks.cli.commands import task as task_cli_commands import flocks.task.background as background_module import flocks.task.manager as task_manager_module from flocks.server.routes import question as question_routes @@ -184,6 +185,43 @@ async def test_recover_stale_running_execution_unblocks_scheduler(tmp_path: Path assert any(item.id != execution.id and item.status == TaskStatus.QUEUED for item in executions) +@pytest.mark.asyncio +async def test_recover_orphaned_queued_execution_restores_queue_ref(tmp_path: Path): + await TaskManager.start(max_concurrent=1, poll_interval=999, scheduler_interval=999) + # Pause the execution loop so it cannot race the test by claiming the + # execution before we manually simulate the orphan state (queued row + # with no queue ref). + TaskManager.pause_queue() + scheduler = await TaskManager.create_scheduler( + title="恢复孤儿排队任务", + mode=SchedulerMode.ONCE, + trigger=TaskTrigger(run_immediately=False), + workspace_directory=str(tmp_path / "workspace"), + ) + execution = await TaskManager.create_execution_from_scheduler( + scheduler, + trigger_type=ExecutionTriggerType.RUN_ONCE, + enqueue=True, + ) + await TaskStore.finish_queue_ref(execution.id) + + manager = TaskManager.get() + assert manager is not None + + recovered = await manager._recover_orphaned_queued_executions() + refreshed = await TaskManager.get_execution(execution.id) + + assert recovered == 1 + assert refreshed is not None + assert refreshed.status == TaskStatus.QUEUED + assert refreshed.started_at is None + assert refreshed.completed_at is None + assert refreshed.session_id is None + queue_ref = await TaskStore.get_queue_ref(execution.id) + assert queue_ref is not None + assert queue_ref.status == TaskStatus.QUEUED + + @pytest.mark.asyncio async def test_queue_status_reports_stale_running_execution(tmp_path: Path): await TaskManager.start(max_concurrent=1, poll_interval=999, scheduler_interval=999) @@ -463,6 +501,220 @@ async def test_batch_cancel_counts_only_actual_cancellations(tmp_path: Path): assert cancelled == 1 +@pytest.mark.asyncio +async def test_delete_scheduler_cleans_active_executions_before_cascade( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +): + cancelled_runtime_ids: list[str] = [] + + async def fake_cancel_runtime(_cls, execution): + cancelled_runtime_ids.append(execution.id) + + monkeypatch.setattr( + TaskManager, + "_cancel_execution_runtime", + classmethod(fake_cancel_runtime), + ) + + scheduler = await TaskManager.create_scheduler( + title="删除前清理普通计划", + mode=SchedulerMode.ONCE, + trigger=TaskTrigger(run_immediately=False), + workspace_directory=str(tmp_path / "workspace"), + ) + pending = await TaskManager.create_execution_from_scheduler( + scheduler, + trigger_type=ExecutionTriggerType.RUN_ONCE, + enqueue=False, + ) + queued = await TaskManager.create_execution_from_scheduler( + scheduler, + trigger_type=ExecutionTriggerType.RUN_ONCE, + enqueue=True, + ) + running = await TaskManager.create_execution_from_scheduler( + scheduler, + trigger_type=ExecutionTriggerType.RUN_ONCE, + enqueue=False, + ) + running.status = TaskStatus.RUNNING + running.queued_at = datetime.now(timezone.utc) - timedelta(seconds=3) + running.started_at = datetime.now(timezone.utc) - timedelta(seconds=2) + running.session_id = "ses_delete_running" + await TaskStore.update_execution(running) + await TaskStore.enqueue_execution_ref(running.id) + completed = await TaskManager.create_execution_from_scheduler( + scheduler, + trigger_type=ExecutionTriggerType.RUN_ONCE, + enqueue=False, + ) + completed.status = TaskStatus.COMPLETED + completed.completed_at = datetime.now(timezone.utc) + await TaskStore.update_execution(completed) + + deleted = await TaskManager.delete_scheduler(scheduler.id) + + assert deleted is True + assert set(cancelled_runtime_ids) == { + pending.id, + queued.id, + running.id, + } + for execution_id in (pending.id, queued.id, running.id, completed.id): + assert await TaskManager.get_execution(execution_id) is None + assert await TaskStore.get_queue_ref(queued.id) is None + assert await TaskStore.get_queue_ref(running.id) is None + + db = await TaskStore.raw_db() + async with db.execute( + "SELECT COUNT(*) FROM task_executions WHERE scheduler_id = ?", + (scheduler.id,), + ) as cur: + assert (await cur.fetchone())[0] == 0 + async with db.execute( + """ + SELECT COUNT(*) FROM task_execution_queue_refs + WHERE execution_id IN (?, ?) + """, + (queued.id, running.id), + ) as cur: + assert (await cur.fetchone())[0] == 0 + + +@pytest.mark.asyncio +async def test_store_init_normalizes_legacy_paused_execution(tmp_path: Path): + scheduler = await TaskManager.create_scheduler( + title="兼容旧 paused 状态", + mode=SchedulerMode.ONCE, + trigger=TaskTrigger(run_immediately=False), + workspace_directory=str(tmp_path / "workspace"), + ) + execution = await TaskManager.create_execution_from_scheduler( + scheduler, + trigger_type=ExecutionTriggerType.RUN_ONCE, + enqueue=True, + ) + + db = await TaskStore.raw_db() + await db.execute( + """ + UPDATE task_executions + SET status = 'paused', completed_at = NULL, error = NULL + WHERE id = ? + """, + (execution.id,), + ) + await db.commit() + + assert await TaskStore.get_queue_ref(execution.id) is not None + + await TaskStore.close() + await TaskStore.init() + + normalized = await TaskManager.get_execution(execution.id) + + assert normalized is not None + assert normalized.status == TaskStatus.CANCELLED + assert normalized.completed_at is not None + assert normalized.error == "Normalized from legacy paused state." + assert await TaskStore.get_queue_ref(execution.id) is None + + +@pytest.mark.asyncio +async def test_cli_list_tasks_accepts_legacy_paused_status(monkeypatch: pytest.MonkeyPatch): + scheduler = await TaskManager.create_scheduler( + title="CLI 兼容 paused 查询", + mode=SchedulerMode.ONCE, + trigger=TaskTrigger(run_immediately=False), + ) + execution = await TaskManager.create_execution_from_scheduler( + scheduler, + trigger_type=ExecutionTriggerType.RUN_ONCE, + enqueue=False, + ) + execution.status = TaskStatus.CANCELLED + execution.completed_at = datetime.now(timezone.utc) + await TaskStore.update_execution(execution) + await TaskManager.disable_scheduler(scheduler.id) + + printed: list[object] = [] + monkeypatch.setattr(task_cli_commands.console, "print", lambda *args, **kwargs: printed.append(args)) + + await task_cli_commands._list_tasks("paused", None, 10, "json") + await task_cli_commands._list_tasks("paused", "scheduled", 10, "json") + + assert printed + + +@pytest.mark.asyncio +async def test_delete_builtin_scheduler_archives_and_cancels_active_executions( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +): + cancelled_runtime_ids: list[str] = [] + + async def fake_cancel_runtime(_cls, execution): + cancelled_runtime_ids.append(execution.id) + + monkeypatch.setattr( + TaskManager, + "_cancel_execution_runtime", + classmethod(fake_cancel_runtime), + ) + + scheduler = await TaskManager.create_scheduler( + title="删除前清理内置计划", + mode=SchedulerMode.CRON, + trigger=TaskTrigger(cron="*/5 * * * *", timezone="Asia/Shanghai"), + workspace_directory=str(tmp_path / "workspace"), + dedup_key="builtin:test-delete-cleanup", + ) + queued = await TaskManager.create_execution_from_scheduler( + scheduler, + trigger_type=ExecutionTriggerType.SCHEDULED, + enqueue=True, + ) + running = await TaskManager.create_execution_from_scheduler( + scheduler, + trigger_type=ExecutionTriggerType.SCHEDULED, + enqueue=False, + ) + running.status = TaskStatus.RUNNING + running.queued_at = datetime.now(timezone.utc) - timedelta(seconds=3) + running.started_at = datetime.now(timezone.utc) - timedelta(seconds=2) + running.session_id = "ses_builtin_running" + await TaskStore.update_execution(running) + await TaskStore.enqueue_execution_ref(running.id) + completed = await TaskManager.create_execution_from_scheduler( + scheduler, + trigger_type=ExecutionTriggerType.SCHEDULED, + enqueue=False, + ) + completed.status = TaskStatus.COMPLETED + completed.completed_at = datetime.now(timezone.utc) + await TaskStore.update_execution(completed) + + deleted = await TaskManager.delete_scheduler(scheduler.id) + archived = await TaskManager.get_scheduler(scheduler.id) + queued_execution = await TaskManager.get_execution(queued.id) + running_execution = await TaskManager.get_execution(running.id) + completed_execution = await TaskManager.get_execution(completed.id) + + assert deleted is True + assert archived is not None + assert archived.status == SchedulerStatus.ARCHIVED + assert set(cancelled_runtime_ids) == {queued.id, running.id} + assert queued_execution is not None + assert queued_execution.status == TaskStatus.CANCELLED + assert running_execution is not None + assert running_execution.status == TaskStatus.CANCELLED + assert completed_execution is not None + assert completed_execution.status == TaskStatus.COMPLETED + assert await TaskStore.get_queue_ref(queued.id) is None + assert await TaskStore.get_queue_ref(running.id) is None + + @pytest.mark.asyncio async def test_dashboard_counts_exclude_immediate_once_schedulers(tmp_path: Path): await TaskManager.create_scheduler( diff --git a/tests/tool/test_task_center_compat.py b/tests/tool/test_task_center_compat.py new file mode 100644 index 00000000..007e4e4e --- /dev/null +++ b/tests/tool/test_task_center_compat.py @@ -0,0 +1,253 @@ +from __future__ import annotations + +from datetime import datetime, timezone + +import pytest + +import flocks.tool.task.task_center # noqa: F401 +from flocks.config.config import Config +from flocks.storage.storage import Storage +from flocks.task.manager import TaskManager +from flocks.task.models import SchedulerMode, TaskTrigger +from flocks.task.store import TaskStore +from flocks.tool.registry import ToolContext, ToolRegistry + + +def _make_ctx() -> ToolContext: + return ToolContext(session_id="test-session", message_id="test-message", agent="rex") + + +@pytest.fixture(autouse=True) +async def isolated_task_env(tmp_path: pytest.TempPathFactory, monkeypatch: pytest.MonkeyPatch): + data_dir = tmp_path / "flocks_data" + data_dir.mkdir(parents=True, exist_ok=True) + monkeypatch.setenv("FLOCKS_DATA_DIR", str(data_dir)) + + Config._global_config = None + Config._cached_config = None + Storage._db_path = None + Storage._initialized = False + TaskManager._instance = None + TaskManager._startup_error = None + TaskStore._initialized = False + TaskStore._conn = None + + await Storage.init() + await TaskStore.init() + + yield + + await TaskManager.stop() + await TaskStore.close() + Config._global_config = None + Config._cached_config = None + Storage._db_path = None + Storage._initialized = False + TaskManager._instance = None + TaskManager._startup_error = None + TaskStore._initialized = False + TaskStore._conn = None + + +class TestTaskCenterCompatibility: + def test_task_create_schema_allows_legacy_schedule_type(self): + schema = ToolRegistry.get_schema("task_create") + + assert schema is not None + assert "schedule_type" in schema.properties + assert "schedule" in schema.properties + assert "enabled" in schema.properties + assert "action" in schema.properties + assert "type" not in schema.required + + def test_task_update_schema_makes_action_optional_and_exposes_trigger_fields(self): + schema = ToolRegistry.get_schema("task_update") + + assert schema is not None + assert "action" not in schema.required + assert "cron" in schema.properties + assert "run_once" in schema.properties + assert "run_at" in schema.properties + assert "cron_description" in schema.properties + assert "timezone" in schema.properties + assert "user_prompt" in schema.properties + assert "enabled" in schema.properties + + @pytest.mark.asyncio + async def test_task_create_accepts_legacy_schedule_type_alias(self): + result = await ToolRegistry.execute( + "task_create", + ctx=_make_ctx(), + title="每10分钟执行一次", + description="兼容旧 schedule_type 字段", + schedule_type="cron", + cron="*/10 * * * *", + cron_description="每10分钟执行一次", + user_prompt="执行兼容性检查", + ) + + assert result.success is True + + schedulers, total = await TaskManager.list_schedulers(limit=10) + assert total == 1 + scheduler = schedulers[0] + assert scheduler.mode == SchedulerMode.CRON + assert scheduler.trigger.cron == "*/10 * * * *" + assert scheduler.trigger.cron_description == "每10分钟执行一次" + assert scheduler.source.user_prompt == "执行兼容性检查" + + @pytest.mark.asyncio + async def test_task_create_infers_scheduled_type_from_cron(self): + result = await ToolRegistry.execute( + "task_create", + ctx=_make_ctx(), + title="终端输出测试", + description='每4分钟在终端输出"我是 flocks-04"', + cron="*/4 * * * *", + user_prompt="在终端中输出:我是 flocks-04", + ) + + assert result.success is True + + schedulers, total = await TaskManager.list_schedulers(limit=10) + assert total == 1 + scheduler = schedulers[0] + assert scheduler.mode == SchedulerMode.CRON + assert scheduler.trigger.cron == "*/4 * * * *" + + @pytest.mark.asyncio + async def test_task_create_accepts_legacy_schedule_action_and_enabled_fields(self): + result = await ToolRegistry.execute( + "task_create", + ctx=_make_ctx(), + title="终端输出测试", + description='每4分钟在终端输出"我是 flocks-04"', + schedule="*/4 * * * *", + user_prompt="在终端中输出:我是 flocks-04", + action="exec", + enabled="True", + ) + + assert result.success is True + + schedulers, total = await TaskManager.list_schedulers(limit=10) + assert total == 1 + scheduler = schedulers[0] + assert scheduler.mode == SchedulerMode.CRON + assert scheduler.status.value == "active" + assert scheduler.trigger.cron == "*/4 * * * *" + + @pytest.mark.asyncio + async def test_task_update_defaults_to_update_and_accepts_schedule_fields(self): + scheduler = await TaskManager.create_scheduler( + title="原始任务", + description="原始描述", + mode=SchedulerMode.ONCE, + trigger=TaskTrigger( + run_immediately=False, + run_at=datetime(2026, 4, 16, 10, 0, tzinfo=timezone.utc), + ), + ) + + result = await ToolRegistry.execute( + "task_update", + ctx=_make_ctx(), + task_id=scheduler.id, + description="更新后的描述", + cron="*/10 * * * *", + run_once=False, + cron_description="每10分钟执行一次", + timezone="UTC", + user_prompt="更新后的执行内容", + ) + + assert result.success is True + + updated = await TaskManager.get_scheduler(scheduler.id) + assert updated is not None + assert updated.mode == SchedulerMode.CRON + assert updated.description == "更新后的描述" + assert updated.trigger.cron == "*/10 * * * *" + assert updated.trigger.cron_description == "每10分钟执行一次" + assert updated.trigger.timezone == "UTC" + assert updated.source.user_prompt == "更新后的执行内容" + + @pytest.mark.asyncio + async def test_task_create_rejects_run_once_without_time_instead_of_immediate(self): + """run_once=True with no run_at/cron must NOT silently become an immediate task. + + Previously such inputs were inferred as ``queued`` and executed right away, + masking missing-schedule mistakes from legacy clients. + """ + result = await ToolRegistry.execute( + "task_create", + ctx=_make_ctx(), + title="缺少时间参数", + description="只传了 run_once=True 但没给 run_at/cron", + run_once=True, + user_prompt="不应该被立即执行", + ) + + assert result.success is False + assert result.error is not None + assert "run_at" in result.error or "cron" in result.error + + _, total = await TaskManager.list_schedulers(limit=10) + assert total == 0 + + @pytest.mark.asyncio + async def test_task_create_schedule_json_accepts_string_boolean_run_once(self): + """Legacy clients may serialise run_once as the string "false"/"0" — + those must be coerced to False, not treated as truthy.""" + result = await ToolRegistry.execute( + "task_create", + ctx=_make_ctx(), + title="字符串布尔值兼容", + description="run_once 以字符串 'false' 传入", + schedule='{"cron": "*/5 * * * *", "run_once": "false"}', + user_prompt="循环任务", + ) + + assert result.success is True + + schedulers, total = await TaskManager.list_schedulers(limit=10) + assert total == 1 + scheduler = schedulers[0] + assert scheduler.mode == SchedulerMode.CRON + assert scheduler.trigger.cron == "*/5 * * * *" + assert scheduler.trigger.run_immediately is False + + @pytest.mark.asyncio + async def test_task_update_can_disable_and_enable_scheduled_task(self): + scheduler = await TaskManager.create_scheduler( + title="可停止的定时任务", + mode=SchedulerMode.CRON, + trigger=TaskTrigger( + cron="*/5 * * * *", + timezone="Asia/Shanghai", + ), + ) + + disable_result = await ToolRegistry.execute( + "task_update", + ctx=_make_ctx(), + task_id=scheduler.id, + action="stop", + ) + + assert disable_result.success is True + disabled = await TaskManager.get_scheduler(scheduler.id) + assert disabled is not None + assert disabled.status.value == "disabled" + + enable_result = await ToolRegistry.execute( + "task_update", + ctx=_make_ctx(), + task_id=scheduler.id, + enabled=True, + ) + + assert enable_result.success is True + enabled = await TaskManager.get_scheduler(scheduler.id) + assert enabled is not None + assert enabled.status.value == "active" diff --git a/webui/src/api/task.ts b/webui/src/api/task.ts index ff162091..6873e566 100644 --- a/webui/src/api/task.ts +++ b/webui/src/api/task.ts @@ -5,7 +5,7 @@ import client from './client'; // ====================================================================== export type TaskType = 'queued' | 'scheduled'; -export type TaskStatus = 'pending' | 'queued' | 'running' | 'completed' | 'failed' | 'cancelled' | 'paused'; +export type TaskStatus = 'pending' | 'queued' | 'running' | 'completed' | 'failed' | 'cancelled'; export type TaskPriority = 'urgent' | 'high' | 'normal' | 'low'; export type DeliveryStatus = 'unread' | 'notified' | 'viewed'; export type ExecutionMode = 'agent' | 'workflow'; @@ -214,12 +214,6 @@ export const taskAPI = { cancelExecution: (executionId: string) => client.post(`/api/task-executions/${executionId}/cancel`), - pauseExecution: (executionId: string) => - client.post(`/api/task-executions/${executionId}/pause`), - - resumeExecution: (executionId: string) => - client.post(`/api/task-executions/${executionId}/resume`), - retryExecution: (executionId: string) => client.post(`/api/task-executions/${executionId}/retry`), diff --git a/webui/src/locales/en-US/task.json b/webui/src/locales/en-US/task.json index 32231c2f..4816196b 100644 --- a/webui/src/locales/en-US/task.json +++ b/webui/src/locales/en-US/task.json @@ -13,8 +13,7 @@ "running": "Running", "completed": "Completed", "failed": "Failed", - "cancelled": "Cancelled", - "paused": "Paused" + "cancelled": "Cancelled" }, "priority": { "urgent": "Urgent", @@ -92,8 +91,6 @@ "pagination": "{{total}} total, page {{page}}/{{totalPages}}", "detailWaiting": "Waiting for execution...", "detailNoRecord": "No execution record", - "actionPause": "Pause", - "actionResume": "Resume", "actionCancel": "Cancel", "actionRetry": "Retry", "actionRerun": "Re-run", diff --git a/webui/src/locales/zh-CN/task.json b/webui/src/locales/zh-CN/task.json index 0f18e410..3584d2e2 100644 --- a/webui/src/locales/zh-CN/task.json +++ b/webui/src/locales/zh-CN/task.json @@ -13,8 +13,7 @@ "running": "运行中", "completed": "已完成", "failed": "失败", - "cancelled": "已取消", - "paused": "已暂停" + "cancelled": "已取消" }, "priority": { "urgent": "紧急", @@ -92,8 +91,6 @@ "pagination": "共 {{total}} 条,第 {{page}}/{{totalPages}} 页", "detailWaiting": "等待执行中...", "detailNoRecord": "暂无执行记录", - "actionPause": "暂停", - "actionResume": "恢复", "actionCancel": "取消", "actionRetry": "重试", "actionRerun": "重跑", diff --git a/webui/src/pages/Agent/AgentSheet.tsx b/webui/src/pages/Agent/AgentSheet.tsx index 98015918..f136af68 100644 --- a/webui/src/pages/Agent/AgentSheet.tsx +++ b/webui/src/pages/Agent/AgentSheet.tsx @@ -191,7 +191,7 @@ export default function AgentSheet({ agent, onClose, onSaved }: AgentSheetProps) if (messages.length > lastKnownCount) { const lastAssistant = [...messages] .reverse() - .find((m: any) => m.role === 'assistant' && m.finish); + .find((m: any) => (m.info?.role ?? m.role) === 'assistant' && (m.info?.finish ?? m.finish)); if (lastAssistant) { const text = (lastAssistant.parts ?? []) diff --git a/webui/src/pages/Task/QueuedSection.test.tsx b/webui/src/pages/Task/QueuedSection.test.tsx new file mode 100644 index 00000000..048abd50 --- /dev/null +++ b/webui/src/pages/Task/QueuedSection.test.tsx @@ -0,0 +1,217 @@ +import React from 'react'; +import { render, screen, waitFor } from '@testing-library/react'; +import userEvent from '@testing-library/user-event'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +import type { TaskExecution, TaskListParams } from '@/api/task'; +import QueuedSection from './QueuedSection'; + +const mocks = vi.hoisted(() => ({ + useTaskExecutions: vi.fn(), + refetch: vi.fn(), + confirm: vi.fn(), + toastError: vi.fn(), + batchCancelExecutions: vi.fn().mockResolvedValue({ data: { cancelled: 0 } }), + batchDeleteExecutions: vi.fn().mockResolvedValue({ data: { deleted: 0 } }), + getExecution: vi.fn(), + markExecutionViewed: vi.fn(), + cancelExecution: vi.fn(), + retryExecution: vi.fn(), + rerunExecution: vi.fn(), + deleteExecution: vi.fn(), +})); + +vi.mock('react-i18next', () => ({ + useTranslation: () => ({ + t: (key: string, opts?: Record) => { + const count = typeof opts?.count === 'number' ? opts.count : 0; + const translations: Record = { + 'queued.filterAll': '全部', + 'queued.filterCompleted': '已完成', + 'queued.filterFailed': '失败', + 'queued.batchCancel': '批量取消', + 'queued.batchDelete': '批量删除', + 'queued.confirmBatchCancelBtn': '确认批量取消', + 'queued.emptyTitle': '暂无任务', + 'queued.emptyDescription': '暂无任务描述', + 'queued.colStatus': '状态', + 'queued.colSource': '来源', + 'queued.colName': '名称', + 'queued.colMode': '模式', + 'queued.colPriority': '优先级', + 'queued.colTime': '时间', + }; + if (key === 'queued.selectedCount') { + return `已选 ${count} 项`; + } + if (key === 'queued.pagination') { + return `共 ${opts?.total ?? 0} 条,第 ${opts?.page ?? 1}/${opts?.totalPages ?? 1} 页`; + } + return translations[key] ?? key; + }, + i18n: { changeLanguage: vi.fn() }, + }), +})); + +vi.mock('@/components/common/Toast', () => ({ + useToast: () => ({ + error: mocks.toastError, + success: vi.fn(), + info: vi.fn(), + warning: vi.fn(), + addToast: vi.fn(), + removeToast: vi.fn(), + toasts: [], + }), +})); + +vi.mock('@/components/common/ConfirmDialog', () => ({ + useConfirm: () => mocks.confirm, +})); + +vi.mock('@/hooks/useTasks', () => ({ + useTaskExecutions: (params?: TaskListParams) => mocks.useTaskExecutions(params), +})); + +vi.mock('@/api/task', () => ({ + taskAPI: { + batchCancelExecutions: mocks.batchCancelExecutions, + batchDeleteExecutions: mocks.batchDeleteExecutions, + getExecution: mocks.getExecution, + markExecutionViewed: mocks.markExecutionViewed, + cancelExecution: mocks.cancelExecution, + retryExecution: mocks.retryExecution, + rerunExecution: mocks.rerunExecution, + deleteExecution: mocks.deleteExecution, + }, +})); + +vi.mock('@/components/common/LoadingSpinner', () => ({ + default: () =>
loading
, +})); + +vi.mock('@/components/common/EmptyState', () => ({ + default: ({ title }: { title: string }) =>
{title}
, +})); + +vi.mock('@/components/common/SessionChat', () => ({ + default: () =>
session-chat
, +})); + +vi.mock('./components', () => ({ + StatusBadge: ({ status }: { status: string }) => {status}, + PriorityBadge: ({ priority }: { priority: string }) => {priority}, + SourceBadge: ({ sourceType }: { sourceType: string }) => {sourceType}, + ModeBadge: ({ mode, agent }: { mode: string; agent: string }) => {mode}:{agent}, + ActionButton: ({ label, onClick }: { label: string; onClick: () => void }) => ( + + ), +})); + +vi.mock('./helpers', () => ({ + PAGE_SIZE: 20, + formatTime: (value?: string) => value ?? '', + formatDuration: (value?: number) => String(value ?? ''), +})); + +function buildExecution( + id: string, + title: string, + status: TaskExecution['status'] = 'queued', +): TaskExecution { + return { + id, + schedulerID: `scheduler-${id}`, + title, + description: '', + priority: 'normal', + source: { sourceType: 'user_conversation' }, + triggerType: 'run_once', + status, + deliveryStatus: 'viewed', + queuedAt: '2026-04-16T00:00:00Z', + startedAt: undefined, + completedAt: status === 'completed' ? '2026-04-16T00:10:00Z' : undefined, + durationMs: undefined, + sessionID: undefined, + resultSummary: undefined, + error: undefined, + executionInputSnapshot: {}, + workspaceDirectory: undefined, + retry: { + maxRetries: 3, + retryCount: 0, + retryDelaySeconds: 60, + retryAfter: undefined, + }, + executionMode: 'agent', + agentName: 'rex', + workflowID: undefined, + createdAt: '2026-04-16T00:00:00Z', + updatedAt: '2026-04-16T00:00:00Z', + }; +} + +describe('QueuedSection', () => { + const allTasks = [ + buildExecution('exec-all-1', '全部任务 1'), + buildExecution('exec-all-2', '全部任务 2'), + ]; + const completedTasks = [ + buildExecution('exec-done-1', '完成任务 1', 'completed'), + buildExecution('exec-done-2', '完成任务 2', 'completed'), + ]; + + beforeEach(() => { + vi.clearAllMocks(); + mocks.confirm.mockResolvedValue(true); + mocks.useTaskExecutions.mockImplementation((params?: TaskListParams) => { + const tasks = params?.status === 'completed' ? completedTasks : allTasks; + return { + tasks, + total: tasks.length, + loading: false, + error: null, + refetch: mocks.refetch, + }; + }); + }); + + it('切换筛选后会清除不可见列表的选中项', async () => { + const user = userEvent.setup(); + + render(); + + const [, firstRowCheckbox] = screen.getAllByRole('checkbox'); + await user.click(firstRowCheckbox); + + expect(screen.getByText('已选 1 项')).toBeInTheDocument(); + expect(screen.getByRole('button', { name: '批量取消' })).toBeInTheDocument(); + + await user.click(screen.getByRole('button', { name: '已完成' })); + + await waitFor(() => { + expect(screen.queryByText('已选 1 项')).not.toBeInTheDocument(); + expect(screen.queryByRole('button', { name: '批量取消' })).not.toBeInTheDocument(); + expect(screen.queryByRole('button', { name: '批量删除' })).not.toBeInTheDocument(); + }); + }); + + it('切换到其他筛选时不会错误保留表头全选状态', async () => { + const user = userEvent.setup(); + + render(); + + const [headerCheckbox] = screen.getAllByRole('checkbox'); + await user.click(headerCheckbox); + + expect((headerCheckbox as HTMLInputElement).checked).toBe(true); + + await user.click(screen.getByRole('button', { name: '已完成' })); + + await waitFor(() => { + const [nextHeaderCheckbox] = screen.getAllByRole('checkbox'); + expect((nextHeaderCheckbox as HTMLInputElement).checked).toBe(false); + }); + }); +}); diff --git a/webui/src/pages/Task/QueuedSection.tsx b/webui/src/pages/Task/QueuedSection.tsx index 7ee8e90b..fbc876e5 100644 --- a/webui/src/pages/Task/QueuedSection.tsx +++ b/webui/src/pages/Task/QueuedSection.tsx @@ -1,7 +1,7 @@ import { useState, useEffect, useCallback, useRef } from 'react'; import { useTranslation } from 'react-i18next'; import { - ListTodo, Play, Pause, RotateCcw, XCircle, Trash2, + ListTodo, Play, RotateCcw, XCircle, Trash2, ChevronLeft, ChevronRight, GripVertical, X, } from 'lucide-react'; import LoadingSpinner from '@/components/common/LoadingSpinner'; @@ -36,6 +36,11 @@ export default function QueuedSection({ onRefreshGlobal }: { onRefreshGlobal: () const { tasks, total, loading, error, refetch } = useTaskExecutions(listParams, { pollInterval: 5000 }); const totalPages = Math.max(1, Math.ceil(total / PAGE_SIZE)); const refresh = useCallback(() => { refetch(); onRefreshGlobal(); }, [refetch, onRefreshGlobal]); + const visibleSelectedIds = tasks + .filter(task => selectedTasks.has(task.id)) + .map(task => task.id); + const hasVisibleSelection = visibleSelectedIds.length > 0; + const allVisibleSelected = tasks.length > 0 && tasks.every(task => selectedTasks.has(task.id)); // Keep detailTask in sync: update from list data when available, // but never clear it just because the task left the current page. @@ -45,6 +50,24 @@ export default function QueuedSection({ onRefreshGlobal }: { onRefreshGlobal: () if (found) setDetailTask(found); }, [tasks, selectedId]); + // Selection is scoped to the current visible list so users never batch + // operate on hidden rows from a previous page or filter. + useEffect(() => { + setSelectedTasks(prev => { + let changed = false; + const visibleIds = new Set(tasks.map(task => task.id)); + const next = new Set(); + prev.forEach(id => { + if (visibleIds.has(id)) { + next.add(id); + } else { + changed = true; + } + }); + return changed ? next : prev; + }); + }, [tasks]); + const markViewedIfNeeded = useCallback(async (task: TaskExecution) => { if (task.status !== 'completed' || task.deliveryStatus !== 'unread') { return task; @@ -60,21 +83,22 @@ export default function QueuedSection({ onRefreshGlobal }: { onRefreshGlobal: () // When the user selects a task, also fetch it directly to be sure // we have the latest data even if it's not on the current page. - const fetchDetailTask = useCallback(async (task: TaskExecution) => { + const fetchDetailTask = useCallback(async (taskId: string) => { try { - const res = await taskAPI.getExecution(task.id); + const res = await taskAPI.getExecution(taskId); const detail = await markViewedIfNeeded(res.data); setDetailTask(detail); } catch { /* ignore — list sync will cover it */ } }, [markViewedIfNeeded]); + // The detail drawer is intentionally kept open across page/filter changes, + // so we must refresh it by ID rather than looking it up in the current page. const refreshWithDetail = useCallback(() => { refresh(); if (selectedId) { - const selected = tasks.find(t => t.id === selectedId); - if (selected) fetchDetailTask(selected); + fetchDetailTask(selectedId); } - }, [refresh, selectedId, fetchDetailTask, tasks]); + }, [refresh, selectedId, fetchDetailTask]); const closeDetail = useCallback(() => { setSelectedId(null); @@ -88,7 +112,7 @@ export default function QueuedSection({ onRefreshGlobal }: { onRefreshGlobal: () } setSelectedId(task.id); setDetailTask(task); - fetchDetailTask(task); + fetchDetailTask(task.id); }, [selectedId, closeDetail, fetchDetailTask]); const handleAction = async (action: string, taskId: string) => { @@ -97,8 +121,6 @@ export default function QueuedSection({ onRefreshGlobal }: { onRefreshGlobal: () case 'cancel': await taskAPI.cancelExecution(taskId); break; - case 'pause': await taskAPI.pauseExecution(taskId); break; - case 'resume': await taskAPI.resumeExecution(taskId); break; case 'retry': await taskAPI.retryExecution(taskId); break; case 'rerun': await taskAPI.rerunExecution(taskId); @@ -117,8 +139,7 @@ export default function QueuedSection({ onRefreshGlobal }: { onRefreshGlobal: () } refresh(); if (selectedId === taskId) { - const selected = tasks.find(t => t.id === taskId); - if (selected) fetchDetailTask(selected); + fetchDetailTask(taskId); } } catch (err: unknown) { toast.error(t('queued.actionFailed'), err instanceof Error ? err.message : String(err)); @@ -129,36 +150,64 @@ export default function QueuedSection({ onRefreshGlobal }: { onRefreshGlobal: () setSelectedTasks(prev => { const s = new Set(prev); s.has(id) ? s.delete(id) : s.add(id); return s; }); const handleBatchCancel = async () => { - if (!selectedTasks.size) return; + if (!hasVisibleSelection) return; + const executionIds = [...new Set(visibleSelectedIds)]; const ok = await confirm({ - description: t('queued.confirmBatchCancel', { count: selectedTasks.size }), + description: t('queued.confirmBatchCancel', { count: executionIds.length }), variant: 'warning', confirmText: t('queued.confirmBatchCancelBtn'), }); if (!ok) return; - const selectedItems = [...selectedTasks] - .map(id => tasks.find(t => t.id === id)) - .filter((task): task is TaskExecution => !!task); - await taskAPI.batchCancelExecutions([...new Set(selectedItems.map(task => task.id))]); - setSelectedTasks(new Set()); - refresh(); + try { + await taskAPI.batchCancelExecutions(executionIds); + setSelectedTasks(new Set()); + refresh(); + } catch (err: unknown) { + toast.error(t('queued.actionFailed'), err instanceof Error ? err.message : String(err)); + } }; const handleBatchDelete = async () => { - if (!selectedTasks.size) return; + if (!hasVisibleSelection) return; + const executionIds = [...new Set(visibleSelectedIds)]; const ok = await confirm({ - description: t('queued.confirmBatchDelete', { count: selectedTasks.size }), + description: t('queued.confirmBatchDelete', { count: executionIds.length }), variant: 'danger', confirmText: t('common:button.delete'), }); if (!ok) return; - const selectedItems = [...selectedTasks] - .map(id => tasks.find(t => t.id === id)) - .filter((task): task is TaskExecution => !!task); - await taskAPI.batchDeleteExecutions([...new Set(selectedItems.map(task => task.id))]); + try { + await taskAPI.batchDeleteExecutions(executionIds); + setSelectedTasks(new Set()); + if (selectedId && selectedTasks.has(selectedId)) closeDetail(); + refresh(); + } catch (err: unknown) { + toast.error(t('queued.actionFailed'), err instanceof Error ? err.message : String(err)); + } + }; + + const handleToggleSelectAll = () => { + setSelectedTasks(prev => { + const next = new Set(prev); + if (allVisibleSelected) { + tasks.forEach(task => next.delete(task.id)); + } else { + tasks.forEach(task => next.add(task.id)); + } + return next; + }); + }; + + const handleFilterChange = (nextFilterKey: string) => { + setFilterKey(nextFilterKey); + setPage(0); + setSelectedTasks(new Set()); + closeDetail(); + }; + + const handlePageChange = (nextPage: number) => { + setPage(nextPage); setSelectedTasks(new Set()); - if (selectedId && selectedTasks.has(selectedId)) closeDetail(); - refresh(); }; if (loading && tasks.length === 0) return
; @@ -171,7 +220,7 @@ export default function QueuedSection({ onRefreshGlobal }: { onRefreshGlobal: () {QUEUED_FILTERS.map(f => ( ))} - {selectedTasks.size > 0 && ( + {hasVisibleSelection && (
- {t('queued.selectedCount', { count: selectedTasks.size })} + {t('queued.selectedCount', { count: visibleSelectedIds.length })}
@@ -204,8 +253,8 @@ export default function QueuedSection({ onRefreshGlobal }: { onRefreshGlobal: () 0 && selectedTasks.size === tasks.length} - onChange={() => selectedTasks.size === tasks.length ? setSelectedTasks(new Set()) : setSelectedTasks(new Set(tasks.map(t => t.id)))} + checked={allVisibleSelected} + onChange={handleToggleSelectAll} className="rounded border-gray-300" /> @@ -253,10 +302,10 @@ export default function QueuedSection({ onRefreshGlobal }: { onRefreshGlobal: () {t('queued.pagination', { total, page: page + 1, totalPages })}
- -
@@ -365,12 +414,6 @@ function QueuedDetailPanel({ task, onClose, onAction, onRefresh }: { )}
- {(task.status === 'running' || task.status === 'queued') && ( - } label={t('queued.actionPause')} onClick={() => onAction('pause', task.id)} color="yellow" /> - )} - {task.status === 'paused' && ( - } label={t('queued.actionResume')} onClick={() => onAction('resume', task.id)} color="green" /> - )} {!['completed', 'cancelled', 'failed'].includes(task.status) && ( } label={t('queued.actionCancel')} onClick={() => onAction('cancel', task.id)} color="gray" /> )} diff --git a/webui/src/pages/Task/TaskSheet.test.tsx b/webui/src/pages/Task/TaskSheet.test.tsx new file mode 100644 index 00000000..af92c3c2 --- /dev/null +++ b/webui/src/pages/Task/TaskSheet.test.tsx @@ -0,0 +1,235 @@ +import React from 'react'; +import { render, screen, waitFor } from '@testing-library/react'; +import userEvent from '@testing-library/user-event'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +import TaskSheet from './TaskSheet'; + +const mocks = vi.hoisted(() => ({ + createScheduler: vi.fn(), + updateScheduler: vi.fn(), + toastError: vi.fn(), + agentList: vi.fn(), + workflowList: vi.fn(), + getMessages: vi.fn(), + clientPost: vi.fn(), +})); + +vi.mock('react-i18next', () => ({ + useTranslation: () => ({ + t: (key: string) => { + const translations: Record = { + 'form.titleLabel': '标题 *', + 'form.titlePlaceholder': '输入任务标题', + 'form.descLabel': '描述', + 'form.descPlaceholder': '任务描述(可选)', + 'form.priorityLabel': '优先级', + 'form.scheduleKindLabel': '调度', + 'form.modeLabel': '执行模式', + 'form.scheduleConfig': '调度配置', + 'form.freqRecurringLabel': '执行频率', + 'form.selectFrequency': '请选择频率', + 'form.customOption': '自定义…', + 'form.timezoneLabel': '时区', + 'form.cronDescLabel': '周期描述', + 'form.cronDescHint': '(展示用,留空则自动生成)', + 'form.cronDescPlaceholder': '例:每天早上9点', + 'form.additionalInfoLabel': '任务补充信息', + 'form.additionalInfoHint': '(Agent 执行时的具体指令,可选)', + 'form.additionalInfoPlaceholder': '例:查询 threatbook.cn 的威胁情报,生成详细报告', + 'form.immediateOption': '立即执行', + 'form.onceAtTimeOption': '指定时间', + 'form.recurringOption': '循环执行', + 'form.agentName': 'Agent 名称', + 'form.timezoneShanghai': 'Asia/Shanghai(北京时间 UTC+8)', + 'form.normalLabel': '普通', + 'form.selectWorkflow': '请选择 Workflow', + 'taskSheet.entityType': '任务', + 'taskSheet.createFailed': '创建失败', + 'taskSheet.saveFailed': '保存失败', + }; + return translations[key] ?? key; + }, + i18n: { language: 'zh-CN' }, + }), +})); + +vi.mock('@/api/task', () => ({ + taskAPI: { + createScheduler: mocks.createScheduler, + updateScheduler: mocks.updateScheduler, + }, +})); + +vi.mock('@/api/agent', () => ({ + agentAPI: { + list: mocks.agentList, + }, +})); + +vi.mock('@/api/workflow', () => ({ + workflowAPI: { + list: mocks.workflowList, + }, +})); + +vi.mock('@/api/session', () => ({ + sessionApi: { + getMessages: mocks.getMessages, + }, +})); + +vi.mock('@/api/client', () => ({ + default: { + post: mocks.clientPost, + }, +})); + +vi.mock('@/components/common/Toast', () => ({ + useToast: () => ({ + error: mocks.toastError, + success: vi.fn(), + info: vi.fn(), + warning: vi.fn(), + addToast: vi.fn(), + removeToast: vi.fn(), + toasts: [], + }), +})); + +vi.mock('@/components/common/EntitySheet', () => ({ + __esModule: true, + default: ({ + children, + onSubmit, + submitDisabled, + submitLoading, + }: { + children: React.ReactNode; + onSubmit: () => void | Promise; + submitDisabled?: boolean; + submitLoading?: boolean; + }) => ( +
+ + {children} +
+ ), + useEntitySheet: () => ({ + openRex: vi.fn(), + openTest: vi.fn(), + }), +})); + +vi.mock('@/components/common/PillGroup', () => ({ + __esModule: true, + default: ({ + options, + value, + onChange, + }: { + options: Array<{ value: string; label: string }>; + value: string; + onChange: (value: string) => void; + }) => ( +
+ {options.map((option) => ( + + ))} +
+ ), +})); + +vi.mock('@/hooks/useTasks', () => ({ + useTaskExecutionsByScheduler: () => ({ + records: [], + total: 0, + loading: false, + error: null, + refetch: vi.fn(), + }), +})); + +vi.mock('@/utils/agentDisplay', () => ({ + getAgentDisplayDescription: () => '', +})); + +vi.mock('./components', () => ({ + StatusBadge: ({ status }: { status: string }) => {status}, +})); + +vi.mock('./helpers', () => ({ + CRON_PRESETS: [ + { key: 'daily0900', value: '0 9 * * *' }, + { key: 'custom', value: '__custom__' }, + ], + describeCron: (cron: string) => `cron:${cron}`, + formatDuration: (value?: number) => String(value ?? ''), + formatTime: (value?: string) => value ?? '', +})); + +describe('TaskSheet', () => { + beforeEach(() => { + vi.clearAllMocks(); + mocks.createScheduler.mockResolvedValue({ data: { id: 'task_1' } }); + mocks.updateScheduler.mockResolvedValue({ data: { id: 'task_1' } }); + mocks.agentList.mockImplementation(() => new Promise(() => {})); + mocks.workflowList.mockImplementation(() => new Promise(() => {})); + mocks.getMessages.mockResolvedValue([]); + mocks.clientPost.mockResolvedValue({ data: {} }); + }); + + it('创建循环任务时展示并提交 timezone 与 cronDescription', async () => { + const user = userEvent.setup(); + const onClose = vi.fn(); + const onSaved = vi.fn(); + + render( + , + ); + + const timezoneSelect = screen + .getAllByRole('combobox') + .find((element) => (element as HTMLSelectElement).value === 'Asia/Shanghai'); + expect(timezoneSelect).toBeDefined(); + expect(screen.getByPlaceholderText('例:每天早上9点')).toBeInTheDocument(); + + await user.type(screen.getByPlaceholderText('输入任务标题'), '每天同步情报'); + await user.type(screen.getByPlaceholderText('0 9 * * 1-5'), '0 8 * * *'); + await user.selectOptions(timezoneSelect as HTMLSelectElement, 'UTC'); + await user.type(screen.getByPlaceholderText('cron:0 8 * * *'), '每天 UTC 08:00'); + + await user.click(screen.getByRole('button', { name: '提交' })); + + await waitFor(() => { + expect(mocks.createScheduler).toHaveBeenCalledWith( + expect.objectContaining({ + title: '每天同步情报', + type: 'scheduled', + priority: 'normal', + executionMode: 'agent', + agentName: 'rex', + cron: '0 8 * * *', + timezone: 'UTC', + cronDescription: '每天 UTC 08:00', + }), + ); + }); + + expect(onSaved).toHaveBeenCalledTimes(1); + expect(onClose).toHaveBeenCalledTimes(1); + }); +}); diff --git a/webui/src/pages/Task/TaskSheet.tsx b/webui/src/pages/Task/TaskSheet.tsx index 8e0b93c1..6f711cab 100644 --- a/webui/src/pages/Task/TaskSheet.tsx +++ b/webui/src/pages/Task/TaskSheet.tsx @@ -165,7 +165,9 @@ export default function TaskSheet({ task, defaultScheduleKind = 'recurring', onC params.runOnce = true; params.runAt = formData.runAt ? new Date(formData.runAt).toISOString() : undefined; } else if (isRecurring) { - params.cron = effectiveCron; + params.cron = effectiveCron.trim(); + params.timezone = formData.timezone; + params.cronDescription = formData.cronDescription.trim() || undefined; } await taskAPI.createScheduler(params); } @@ -206,7 +208,7 @@ ${fields} if (messages.length > initialCount) { const lastAssistant = [...messages] .reverse() - .find((m: any) => m.role === 'assistant' && m.finish); + .find((m: any) => (m.info?.role ?? m.role) === 'assistant' && (m.info?.finish ?? m.finish)); if (lastAssistant) { const text = (lastAssistant.parts ?? []) @@ -413,7 +415,7 @@ function TaskFormContent({ <>
update({ timezone: e.target.value })} - className="w-full px-3 py-2 border border-gray-300 rounded-lg outline-none text-sm bg-white" - > - - - - - - -
-
- - update({ cronDescription: e.target.value })} - placeholder={effectiveCron ? describeCron(effectiveCron) : t('form.cronDescPlaceholder')} - className="w-full px-3 py-2 border border-gray-300 rounded-lg focus:ring-2 focus:ring-purple-500 outline-none text-sm bg-white" - /> -
- - )} + {/* Timezone + custom description */} +
+ + +
+
+ + update({ cronDescription: e.target.value })} + placeholder={effectiveCron ? describeCron(effectiveCron) : t('form.cronDescPlaceholder')} + className="w-full px-3 py-2 border border-gray-300 rounded-lg focus:ring-2 focus:ring-purple-500 outline-none text-sm bg-white" + /> +
)}
diff --git a/webui/src/pages/Task/helpers.ts b/webui/src/pages/Task/helpers.ts index 5438b72b..b8516c8b 100644 --- a/webui/src/pages/Task/helpers.ts +++ b/webui/src/pages/Task/helpers.ts @@ -8,7 +8,6 @@ export const STATUS_CONFIG: Record completed: { icon: '\u2705', color: 'text-green-500' }, failed: { icon: '\u274c', color: 'text-red-500' }, cancelled: { icon: '\ud83d\udeab', color: 'text-gray-400' }, - paused: { icon: '\u23f8\ufe0f', color: 'text-yellow-500' }, }; export const PRIORITY_CONFIG: Record = {