feat: add job rerun endpoint and jobsctl CLI#79
Conversation
📝 WalkthroughWalkthroughThis PR adds job rerun functionality with a new CLI tool, introduces exception-based error handling for Docuseal processing, and expands worker documentation. The changes include a backend rerun endpoint with ULID-based idempotency keys, comprehensive tests, and updated local development guidance. Changes
Sequence Diagram(s)sequenceDiagram
actor User
participant CLI as jobsctl CLI
participant API as Backend API
participant JobQueue as Job Queue
participant Worker as Worker
User->>CLI: jobsctl rerun {job_id}
CLI->>API: GET /jobs/{job_id}
API->>API: Fetch source job by ID
API-->>CLI: Return job details & payload
CLI->>API: POST /jobs/{job_id}/rerun
API->>API: Resolve job function from mapping
API->>API: Validate payload
API->>API: Generate ULID idempotency key
API->>JobQueue: Enqueue new job
JobQueue-->>API: Job enqueued
API-->>CLI: Return new job details
CLI-->>User: Display new job ID (exit 0)
JobQueue->>Worker: Dequeue job
Worker->>Worker: Execute job function
Worker-->>JobQueue: Mark complete
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@apps/worker/src/five08/backend/api.py`:
- Around line 686-690: The current code coerces non-dict source_job.payload to
{} and silently supplies defaults for missing args/kwargs, which can produce an
incorrect rerun; update the validation to reject malformed payloads: first check
that source_job.payload is a dict and return a 400 JSONResponse if not, then
require that payload contains "args" and "kwargs" keys of types list and dict
respectively (use the existing raw_payload, raw_args, raw_kwargs identifiers)
and return JSONResponse({"error":"invalid_job_payload"}, status_code=400) if
those checks fail so only well-formed jobs are enqueued.
In `@tests/unit/test_jobcli.py`:
- Around line 28-115: Tests depend on environment vars (API_SHARED_SECRET,
WORKER_API_BASE_URL) and localhost defaults which causes flaky failures; update
the tests to explicitly control the env and API base URL: in
test_jobsctl_status_fails_without_secret clear/unset API_SHARED_SECRET (use
monkeypatch.delenv or equivalent) before calling jobcli.run, and in tests that
assert request URLs (test_jobsctl_status_calls_jobs_endpoint,
test_jobsctl_rerun_calls_rerun_endpoint,
test_jobsctl_rerun_uses_default_secret_from_environment,
test_jobsctl_status_prints_error_when_api_returns_error) either pass the
explicit --api-url arg to jobcli.run or monkeypatch WORKER_API_BASE_URL to
"http://localhost:8090" so assertions against called["url"] remain
deterministic; ensure any tests that rely on a default secret set the env via
monkeypatch.setenv("API_SHARED_SECRET", ...) instead of depending on external
state.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (11)
README.mdapps/worker/README.mdapps/worker/pyproject.tomlapps/worker/src/five08/backend/api.pyapps/worker/src/five08/jobcli.pyapps/worker/src/five08/worker/actors.pyapps/worker/src/five08/worker/crm/docuseal_processor.pytests/unit/test_backend_api.pytests/unit/test_docuseal_processor.pytests/unit/test_jobcli.pytests/unit/test_worker_actors.py
| raw_payload = source_job.payload if isinstance(source_job.payload, dict) else {} | ||
| raw_args = raw_payload.get("args", []) | ||
| raw_kwargs = raw_payload.get("kwargs", {}) | ||
| if not isinstance(raw_args, list) or not isinstance(raw_kwargs, dict): | ||
| return JSONResponse({"error": "invalid_job_payload"}, status_code=400) |
There was a problem hiding this comment.
Reject malformed source payloads before enqueueing reruns.
Line 686 currently coerces non-dict payloads to {}, and missing args/kwargs fall back to []/{}. That can enqueue a rerun with the wrong call signature instead of duplicating the source job.
✅ Suggested fix
- raw_payload = source_job.payload if isinstance(source_job.payload, dict) else {}
- raw_args = raw_payload.get("args", [])
- raw_kwargs = raw_payload.get("kwargs", {})
- if not isinstance(raw_args, list) or not isinstance(raw_kwargs, dict):
+ raw_payload = source_job.payload
+ if not isinstance(raw_payload, dict):
+ return JSONResponse({"error": "invalid_job_payload"}, status_code=400)
+
+ if "args" not in raw_payload or "kwargs" not in raw_payload:
+ return JSONResponse({"error": "invalid_job_payload"}, status_code=400)
+
+ raw_args = raw_payload["args"]
+ raw_kwargs = raw_payload["kwargs"]
+ if not isinstance(raw_args, list) or not isinstance(raw_kwargs, dict):
return JSONResponse({"error": "invalid_job_payload"}, status_code=400)As per coding guidelines, apps/worker/src/five08/backend/api.py: "Worker ingest endpoints must validate input, persist jobs, enqueue, and return 202 quickly without performing long processing".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/worker/src/five08/backend/api.py` around lines 686 - 690, The current
code coerces non-dict source_job.payload to {} and silently supplies defaults
for missing args/kwargs, which can produce an incorrect rerun; update the
validation to reject malformed payloads: first check that source_job.payload is
a dict and return a 400 JSONResponse if not, then require that payload contains
"args" and "kwargs" keys of types list and dict respectively (use the existing
raw_payload, raw_args, raw_kwargs identifiers) and return
JSONResponse({"error":"invalid_job_payload"}, status_code=400) if those checks
fail so only well-formed jobs are enqueued.
| def test_jobsctl_status_calls_jobs_endpoint() -> None: | ||
| with patch("five08.jobcli.httpx.request") as mock_request: | ||
| mock_request.return_value = _json_response( | ||
| status_code=200, | ||
| payload={"status": "succeeded", "job_id": "job-123"}, | ||
| method="GET", | ||
| url="http://localhost:8090/jobs/job-123", | ||
| ) | ||
|
|
||
| exit_code = jobcli.run(["--secret", "test-secret", "status", "job-123"]) | ||
|
|
||
| assert exit_code == 0 | ||
| mock_request.assert_called_once() | ||
| called = mock_request.call_args.kwargs | ||
| assert called["method"] == "GET" | ||
| assert called["url"] == "http://localhost:8090/jobs/job-123" | ||
| assert called["headers"] == {"X-API-Secret": "test-secret"} | ||
|
|
||
|
|
||
| def test_jobsctl_rerun_calls_rerun_endpoint() -> None: | ||
| with patch("five08.jobcli.httpx.request") as mock_request: | ||
| mock_request.return_value = _json_response( | ||
| status_code=200, | ||
| payload={ | ||
| "job_id": "job-new", | ||
| "source_job_id": "job-old", | ||
| "status": "queued", | ||
| }, | ||
| method="POST", | ||
| url="http://localhost:8090/jobs/job-old/rerun", | ||
| ) | ||
|
|
||
| exit_code = jobcli.run(["--secret", "test-secret", "rerun", "job-old"]) | ||
|
|
||
| assert exit_code == 0 | ||
| called = mock_request.call_args.kwargs | ||
| assert called["method"] == "POST" | ||
| assert called["url"] == "http://localhost:8090/jobs/job-old/rerun" | ||
|
|
||
|
|
||
| def test_jobsctl_rerun_uses_default_secret_from_environment( | ||
| monkeypatch: pytest.MonkeyPatch, | ||
| ) -> None: | ||
| monkeypatch.setenv("API_SHARED_SECRET", "from-env") | ||
| with patch("five08.jobcli.httpx.request") as mock_request: | ||
| mock_request.return_value = _json_response( | ||
| status_code=200, | ||
| payload={ | ||
| "job_id": "job-new", | ||
| "source_job_id": "job-old", | ||
| "status": "queued", | ||
| }, | ||
| method="POST", | ||
| url="http://localhost:8090/jobs/job-old/rerun", | ||
| ) | ||
|
|
||
| exit_code = jobcli.run( | ||
| ["--api-url", "http://localhost:8090", "rerun", "job-old"] | ||
| ) | ||
|
|
||
| assert exit_code == 0 | ||
| assert mock_request.call_args.kwargs["headers"]["X-API-Secret"] == "from-env" | ||
|
|
||
|
|
||
| def test_jobsctl_status_prints_error_when_api_returns_error( | ||
| capsys: pytest.CaptureFixture[str], | ||
| ) -> None: | ||
| with patch("five08.jobcli.httpx.request") as mock_request: | ||
| mock_request.return_value = _json_response( | ||
| status_code=404, | ||
| payload={"error": "job_not_found"}, | ||
| method="GET", | ||
| url="http://localhost:8090/jobs/job-missing", | ||
| ) | ||
|
|
||
| exit_code = jobcli.run(["--secret", "test-secret", "status", "job-missing"]) | ||
|
|
||
| assert exit_code == 1 | ||
| assert "API error 404: job_not_found" in capsys.readouterr().err | ||
|
|
||
|
|
||
| def test_jobsctl_status_fails_without_secret( | ||
| capsys: pytest.CaptureFixture[str], | ||
| ) -> None: | ||
| exit_code = jobcli.run(["status", "job-123"]) | ||
|
|
||
| assert exit_code == 1 | ||
| assert "Missing API secret" in capsys.readouterr().err |
There was a problem hiding this comment.
These CLI tests are environment-dependent and can flake.
Line 112 assumes no API_SHARED_SECRET in the environment, and several tests assert localhost URLs while relying on default WORKER_API_BASE_URL. In CI/dev shells with those vars set, assertions can fail nondeterministically.
✅ Suggested hardening
-def test_jobsctl_status_calls_jobs_endpoint() -> None:
+def test_jobsctl_status_calls_jobs_endpoint(monkeypatch: pytest.MonkeyPatch) -> None:
+ monkeypatch.delenv("WORKER_API_BASE_URL", raising=False)
with patch("five08.jobcli.httpx.request") as mock_request:
@@
- exit_code = jobcli.run(["--secret", "test-secret", "status", "job-123"])
+ exit_code = jobcli.run(
+ ["--api-url", "http://localhost:8090", "--secret", "test-secret", "status", "job-123"]
+ )
-def test_jobsctl_rerun_calls_rerun_endpoint() -> None:
+def test_jobsctl_rerun_calls_rerun_endpoint(monkeypatch: pytest.MonkeyPatch) -> None:
+ monkeypatch.delenv("WORKER_API_BASE_URL", raising=False)
@@
- exit_code = jobcli.run(["--secret", "test-secret", "rerun", "job-old"])
+ exit_code = jobcli.run(
+ ["--api-url", "http://localhost:8090", "--secret", "test-secret", "rerun", "job-old"]
+ )
-def test_jobsctl_status_fails_without_secret(
- capsys: pytest.CaptureFixture[str],
+def test_jobsctl_status_fails_without_secret(
+ monkeypatch: pytest.MonkeyPatch,
+ capsys: pytest.CaptureFixture[str],
) -> None:
- exit_code = jobcli.run(["status", "job-123"])
+ monkeypatch.delenv("API_SHARED_SECRET", raising=False)
+ exit_code = jobcli.run(["--api-url", "http://localhost:8090", "status", "job-123"])🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/unit/test_jobcli.py` around lines 28 - 115, Tests depend on environment
vars (API_SHARED_SECRET, WORKER_API_BASE_URL) and localhost defaults which
causes flaky failures; update the tests to explicitly control the env and API
base URL: in test_jobsctl_status_fails_without_secret clear/unset
API_SHARED_SECRET (use monkeypatch.delenv or equivalent) before calling
jobcli.run, and in tests that assert request URLs
(test_jobsctl_status_calls_jobs_endpoint,
test_jobsctl_rerun_calls_rerun_endpoint,
test_jobsctl_rerun_uses_default_secret_from_environment,
test_jobsctl_status_prints_error_when_api_returns_error) either pass the
explicit --api-url arg to jobcli.run or monkeypatch WORKER_API_BASE_URL to
"http://localhost:8090" so assertions against called["url"] remain
deterministic; ensure any tests that rely on a default secret set the env via
monkeypatch.setenv("API_SHARED_SECRET", ...) instead of depending on external
state.
Description
Implemented
POST /jobs/{job_id}/rerunto duplicate an existing job with original args/kwargs, preserved retry settings, and a new idempotency key usingmanual-rerun:{source_job_id}:{ULID}while keeping the source job unchanged.Updated DocuSeal processing to raise explicit retryable vs non-retryable errors and changed worker behavior so non-retryable DocuSeal failures mark jobs dead immediately while retryable failures keep existing retry scheduling behavior.
Added a new
jobsctlCLI entrypoint withstatusandrerunsubcommands, wired through worker package scripts, and documented fixed-timeout usage alongside new auth examples.Moved and expanded worker documentation to include API endpoint details, auth header expectations (
X-API-Secret), and rerun/idempotency contract; root README now links to the worker docs.Added unit tests for rerun endpoint handling, job actor failure classification, DocuSeal processor exception behavior, and jobsctl command execution.
Related Issue
How Has This Been Tested?
The commit passed local lint/type checks through the pre-commit hooks (ruff + mypy) during commit.
Summary by CodeRabbit
Release Notes
New Features
jobsctlCLI tool for inspecting job status and rerunning jobs.Documentation