Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions flocks/cli/commands/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,19 @@ async def _list_tasks(status_val, type_val, limit, fmt):
scheduler_status = None
if status_val == "running":
scheduler_status = SchedulerStatus.ACTIVE
elif status_val == "paused":
elif status_val in ("paused", "disabled"):
scheduler_status = SchedulerStatus.DISABLED
tasks, total = await TaskManager.list_schedulers(status=scheduler_status, limit=limit)
else:
task_status = None
if status_val:
mapped_status = "cancelled" if status_val == "paused" else status_val
try:
task_status = TaskStatus(mapped_status)
except ValueError as exc:
raise typer.BadParameter(f"Invalid execution status: {status_val}") from exc
tasks, total = await TaskManager.list_executions(
status=TaskStatus(status_val) if status_val else None,
status=task_status,
limit=limit,
)

Expand All @@ -121,7 +128,7 @@ async def _list_tasks(status_val, type_val, limit, fmt):
status_icon = {
"pending": "⏳", "queued": "📋", "running": "🟢",
"completed": "✅", "failed": "❌", "cancelled": "🚫",
"paused": "⏸️", "stopped": "🛑",
"disabled": "⏸️", "stopped": "🛑",
}
for t in tasks:
icon = status_icon.get(t.status.value, "·")
Expand Down
221 changes: 141 additions & 80 deletions flocks/server/routes/task_entities.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Execution-centric task scheduler/execution routes."""

from typing import List, Optional
from enum import Enum
from typing import List, Optional, Type

from fastapi import APIRouter, HTTPException, Query, status
from pydantic import BaseModel, ConfigDict, Field
Expand Down Expand Up @@ -66,6 +67,77 @@ class PaginatedResponse(BaseModel):
limit: int


def _parse_enum(
value: Optional[str],
enum_cls: Type[Enum],
*,
label: str,
legacy_aliases: Optional[dict[str, object]] = None,
):
if not value:
return None
mapped = legacy_aliases.get(value, value) if legacy_aliases else value
try:
return enum_cls(mapped)
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail=f"Invalid {label}: {value}",
) from exc


def _parse_scheduler_status_filter(status_filter: Optional[str]):
from flocks.task.models import SchedulerStatus

return _parse_enum(
status_filter,
SchedulerStatus,
label="scheduler status",
legacy_aliases={
"running": SchedulerStatus.ACTIVE,
"paused": SchedulerStatus.DISABLED,
},
)


def _parse_execution_status_filter(status_filter: Optional[str]):
from flocks.task.models import TaskStatus

return _parse_enum(
status_filter,
TaskStatus,
label="execution status",
legacy_aliases={"paused": TaskStatus.CANCELLED},
)


def _parse_priority(priority: Optional[str]):
from flocks.task.models import TaskPriority

return _parse_enum(priority, TaskPriority, label="task priority")


def _parse_delivery_status(delivery_status: Optional[str]):
from flocks.task.models import DeliveryStatus

return _parse_enum(delivery_status, DeliveryStatus, label="delivery status")


def _parse_execution_mode(execution_mode: Optional[str]):
from flocks.task.models import ExecutionMode

return _parse_enum(execution_mode, ExecutionMode, label="execution mode")


def _parse_task_type(task_type: str) -> str:
if task_type not in {"queued", "scheduled"}:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail=f"Invalid task type: {task_type}",
)
return task_type


@router.get("/task-system/notice")
async def get_task_system_notice():
from flocks.task.manager import TaskManager
Expand Down Expand Up @@ -114,11 +186,10 @@ async def list_schedulers(
limit: int = Query(20, ge=1, le=100),
):
from flocks.task.manager import TaskManager
from flocks.task.models import SchedulerStatus, TaskPriority

items, total = await TaskManager.list_schedulers(
status=SchedulerStatus(status_filter) if status_filter else None,
priority=TaskPriority(priority) if priority else None,
status=_parse_scheduler_status_filter(status_filter),
priority=_parse_priority(priority),
scheduled_only=scheduled_only,
sort_by=sort_by,
sort_order=sort_order,
Expand All @@ -137,43 +208,50 @@ async def list_schedulers(
async def create_scheduler(req: SchedulerCreateRequest):
from flocks.task.manager import TaskManager
from flocks.task.models import (
ExecutionMode,
SchedulerMode,
TaskPriority,
TaskSource,
TaskTrigger,
build_schedule,
)

if req.type == "queued":
trigger = TaskTrigger(runImmediately=True)
mode = SchedulerMode.ONCE
elif req.run_once:
trigger = build_schedule(
run_once=True,
run_at=req.run_at,
cron=req.cron,
cron_description=req.cron_description,
timezone=req.timezone,
)
mode = SchedulerMode.ONCE
else:
trigger = build_schedule(
run_once=False,
cron=req.cron,
cron_description=req.cron_description,
timezone=req.timezone,
)
mode = SchedulerMode.CRON
task_type = _parse_task_type(req.type)
priority = _parse_priority(req.priority)
execution_mode = _parse_execution_mode(req.execution_mode)
try:
if task_type == "queued":
trigger = TaskTrigger(runImmediately=True)
mode = SchedulerMode.ONCE
elif req.run_once:
trigger = build_schedule(
run_once=True,
run_at=req.run_at,
cron=req.cron,
cron_description=req.cron_description,
timezone=req.timezone,
)
mode = SchedulerMode.ONCE
else:
trigger = build_schedule(
run_once=False,
cron=req.cron,
cron_description=req.cron_description,
timezone=req.timezone,
)
mode = SchedulerMode.CRON
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail=str(exc),
) from exc

scheduler = await TaskManager.create_scheduler(
title=req.title,
description=req.description,
mode=mode,
priority=TaskPriority(req.priority),
priority=priority,
source=TaskSource(user_prompt=req.user_prompt) if req.user_prompt else None,
trigger=trigger,
execution_mode=ExecutionMode(req.execution_mode),
execution_mode=execution_mode,
agent_name=req.agent_name,
workflow_id=req.workflow_id,
skills=req.skills,
Expand All @@ -198,29 +276,34 @@ async def get_scheduler(scheduler_id: str):
@router.put("/task-schedulers/{scheduler_id}")
async def update_scheduler(scheduler_id: str, req: SchedulerUpdateRequest):
from flocks.task.manager import TaskManager
from flocks.task.models import ExecutionMode, TaskPriority

fields = {k: v for k, v in req.model_dump(exclude_none=True).items()}
if "priority" in fields:
fields["priority"] = TaskPriority(fields["priority"])
fields["priority"] = _parse_priority(fields["priority"])
if "execution_mode" in fields:
fields["execution_mode"] = ExecutionMode(fields["execution_mode"])
fields["execution_mode"] = _parse_execution_mode(fields["execution_mode"])
cron = fields.pop("cron", None)
tz = fields.pop("timezone", None)
cron_desc = fields.pop("cron_description", None)
run_once = fields.pop("run_once", None)
run_at = fields.pop("run_at", None)
user_prompt = fields.pop("user_prompt", None)
scheduler = await TaskManager.update_scheduler_with_trigger(
scheduler_id,
fields=fields,
cron=cron,
timezone=tz,
cron_description=cron_desc,
run_once=run_once,
run_at=run_at,
user_prompt=user_prompt,
)
try:
scheduler = await TaskManager.update_scheduler_with_trigger(
scheduler_id,
fields=fields,
cron=cron,
timezone=tz,
cron_description=cron_desc,
run_once=run_once,
run_at=run_at,
user_prompt=user_prompt,
)
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail=str(exc),
) from exc
if not scheduler:
raise HTTPException(404, "Task scheduler not found")
return scheduler.model_dump(mode="json", by_alias=True)
Expand Down Expand Up @@ -296,13 +379,12 @@ async def list_executions(
limit: int = Query(20, ge=1, le=100),
):
from flocks.task.manager import TaskManager
from flocks.task.models import DeliveryStatus, TaskPriority, TaskStatus

items, total = await TaskManager.list_executions(
scheduler_id=scheduler_id,
status=TaskStatus(status_filter) if status_filter else None,
priority=TaskPriority(priority) if priority else None,
delivery_status=DeliveryStatus(delivery_status) if delivery_status else None,
status=_parse_execution_status_filter(status_filter),
priority=_parse_priority(priority),
delivery_status=_parse_delivery_status(delivery_status),
sort_by=sort_by,
sort_order=sort_order,
offset=offset,
Expand All @@ -316,6 +398,20 @@ async def list_executions(
)


@router.post("/task-executions/batch/cancel")
async def batch_cancel(req: BatchRequest):
from flocks.task.manager import TaskManager

return {"cancelled": await TaskManager.batch_cancel(req.execution_ids)}


@router.post("/task-executions/batch/delete")
async def batch_delete(req: BatchRequest):
from flocks.task.manager import TaskManager

return {"deleted": await TaskManager.batch_delete(req.execution_ids)}


@router.get("/task-executions/{execution_id}")
async def get_execution(execution_id: str):
from flocks.task.manager import TaskManager
Expand Down Expand Up @@ -345,27 +441,6 @@ async def cancel_execution(execution_id: str):
raise HTTPException(404, "Task execution not found")
return execution.model_dump(mode="json", by_alias=True)


@router.post("/task-executions/{execution_id}/pause")
async def pause_execution(execution_id: str):
from flocks.task.manager import TaskManager

execution = await TaskManager.pause_execution(execution_id)
if not execution:
raise HTTPException(404, "Task execution not found")
return execution.model_dump(mode="json", by_alias=True)


@router.post("/task-executions/{execution_id}/resume")
async def resume_execution(execution_id: str):
from flocks.task.manager import TaskManager

execution = await TaskManager.resume_execution(execution_id)
if not execution:
raise HTTPException(404, "Task execution not found")
return execution.model_dump(mode="json", by_alias=True)


@router.post("/task-executions/{execution_id}/retry")
async def retry_execution(execution_id: str):
from flocks.task.manager import TaskManager
Expand Down Expand Up @@ -395,17 +470,3 @@ async def delete_execution(execution_id: str):
return {"ok": True}


@router.post("/task-executions/batch/cancel")
async def batch_cancel(req: BatchRequest):
from flocks.task.manager import TaskManager

return {"cancelled": await TaskManager.batch_cancel(req.execution_ids)}


@router.post("/task-executions/batch/delete")
async def batch_delete(req: BatchRequest):
from flocks.task.manager import TaskManager

return {"deleted": await TaskManager.batch_delete(req.execution_ids)}


12 changes: 9 additions & 3 deletions flocks/task/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,19 @@ async def dispatch(
execution: TaskExecution,
scheduler: TaskScheduler,
) -> TaskExecution:
# The execution row was already flipped to RUNNING atomically with the
# queue ref inside TaskStore.claim_next_queue_execution. We only need
# to make sure the in-memory object reflects that and to persist the
# session_id once the task session has been created.
if execution.status != TaskStatus.RUNNING:
execution.status = TaskStatus.RUNNING
if execution.started_at is None:
execution.started_at = datetime.now(timezone.utc)

session_id: Optional[str] = None
if execution.execution_mode == ExecutionMode.AGENT:
session_id = await cls._create_task_session(execution, scheduler)

started_at = datetime.now(timezone.utc)
execution.status = TaskStatus.RUNNING
execution.started_at = started_at
execution.session_id = session_id
await TaskStore.update_execution(execution)

Expand Down
Loading
Loading