Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 5 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,8 @@ Migrations:

### Backend API Endpoints

- `GET /health`: Redis/Postgres/worker health check.
- `GET /jobs/{job_id}`: Fetch queued job status/result payload.
- `POST /jobs/resume-extract`: Enqueue resume profile extraction.
- `POST /jobs/resume-apply`: Enqueue confirmed CRM field apply.
- `POST /webhooks/{source}`: Generic webhook enqueue endpoint.
- `POST /webhooks/espocrm`: EspoCRM webhook endpoint (expects array payload).
- `POST /webhooks/espocrm/people-sync`: EspoCRM contact-change webhook for people cache sync.
- `POST /webhooks/docuseal`: See worker webhook contract docs in
[`apps/worker/README.md`](apps/worker/README.md#webhooks).
- `POST /process-contact/{contact_id}`: Manually enqueue one contact skills job.
- `POST /sync/people`: Manually enqueue a full CRM->people cache sync.
- `POST /audit/events`: Persist one human audit event (`discord` or `admin_dashboard`).
- `GET /auth/login`: Start OIDC Auth Code + PKCE login flow.
- `GET /auth/callback`: Complete OIDC callback and set HttpOnly session cookie.
- `GET /auth/me`: Return active session identity.
- `POST /auth/logout`: Clear active session cookie + server session.
- `POST /auth/discord/links`: Create one-time dashboard deep link from Discord command context.
- `GET /auth/discord/link/{token}`: Resolve Discord deep link into authenticated dashboard redirect.
- Auth flows emit best-effort human audit events (`auth.login`, `auth.logout`) under source `admin_dashboard`.
See the worker service docs: [`apps/worker/README.md#backend-api-endpoints`](apps/worker/README.md#backend-api-endpoints).
CLI request examples are documented at [`apps/worker/README.md#cli-usage`](apps/worker/README.md#cli-usage).

## Local Development

Expand Down Expand Up @@ -94,6 +77,9 @@ uv run --package integrations-worker backend-api

# Worker queue consumer
uv run --package integrations-worker worker-consumer

# Jobs CLI
uv run --package integrations-worker jobsctl --help
```

Or run the full stack with Docker Compose:
Expand Down
184 changes: 184 additions & 0 deletions apps/worker/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,173 @@
# Worker Service

## Auth

- Protected ingest/job endpoints require `API_SHARED_SECRET` to be configured on the worker.
- Send the secret in header `X-API-Secret`.
- Header name is exactly `X-API-Secret` (not `X-API-Secret-Key`).
- `GET /health` and most OIDC session routes (`/auth/login`, `/auth/callback`, `/auth/me`, `/auth/logout`) do not use `X-API-Secret`.
- `POST /auth/discord/links` does use `X-API-Secret` because it is called by trusted backend/bot components.

Example:

```bash
curl -X GET "http://localhost:8090/jobs/<job_id>" \
-H "X-API-Secret: $API_SHARED_SECRET"
```

## CLI Usage

You can use the dedicated `jobsctl` command for common job operations.

Defaults:

- Base URL: `http://localhost:8090` (or `$WORKER_API_BASE_URL`)
- API secret: `$API_SHARED_SECRET` (sent as `X-API-Secret`)
- Timeout: fixed at `10.0` seconds (not configurable)

Usage:

```bash
uv run --package integrations-worker jobsctl --help
uv run --package integrations-worker jobsctl status <job_id>
uv run --package integrations-worker jobsctl rerun <job_id>
```

Examples:

```bash
uv run --package integrations-worker jobsctl status job-123
```

```bash
uv run --package integrations-worker jobsctl rerun job-123
```

If needed, pass overrides explicitly:

```bash
uv run --package integrations-worker jobsctl \
--api-url http://localhost:8090 \
--secret "$API_SHARED_SECRET" \
rerun job-123
```

You can still use `curl` directly:

- Get job status:

```bash
curl -X GET "http://localhost:8090/jobs/<job_id>" \
-H "X-API-Secret: $API_SHARED_SECRET"
```

- Rerun a job:

```bash
curl -X POST "http://localhost:8090/jobs/<job_id>/rerun" \
-H "X-API-Secret: $API_SHARED_SECRET"
```

## Backend API Endpoints

- `GET /health`: Redis/Postgres/worker health check.
- `GET /jobs/{job_id}`: Fetch queued job status/result payload.
- `POST /jobs/{job_id}/rerun`: Enqueue a duplicate rerun of an existing job id.
- `POST /jobs/resume-extract`: Enqueue resume profile extraction.
- `POST /jobs/resume-apply`: Enqueue confirmed CRM field apply.
- `POST /webhooks/{source}`: Generic webhook enqueue endpoint.
- `POST /webhooks/espocrm`: EspoCRM webhook endpoint (expects array payload).
- `POST /webhooks/espocrm/people-sync`: EspoCRM contact-change webhook for people cache sync.
- `POST /webhooks/docuseal`: Docuseal agreement webhook endpoint.
- `POST /process-contact/{contact_id}`: Manually enqueue one contact skills job.
- `POST /sync/people`: Manually enqueue a full CRM->people cache sync.
- `POST /audit/events`: Persist one human audit event (`discord` or `admin_dashboard`).
- `GET /auth/login`: Start OIDC Auth Code + PKCE login flow.
- `GET /auth/callback`: Complete OIDC callback and set HttpOnly session cookie.
- `GET /auth/me`: Return active session identity.
- `POST /auth/logout`: Clear active session cookie + server session.
- `POST /auth/discord/links`: Create one-time dashboard deep link from Discord command context.
- `GET /auth/discord/link/{token}`: Resolve Discord deep link into authenticated dashboard redirect.
- Auth flows emit best-effort human audit events (`auth.login`, `auth.logout`) under source `admin_dashboard`.

## Jobs

### `GET /jobs/{job_id}`

Returns persisted job status and the latest result payload.

- Path params:
- `job_id` (string): persisted job id.

### `POST /jobs/{job_id}/rerun`

Creates and enqueues a new duplicate job from the source job's original `args`/`kwargs`.

- The source job is not mutated.
- A new job row is persisted with a new `job_id`.
- Rerun idempotency key format: `manual-rerun:{source_job_id}:{ULID}`.

Example:

```bash
curl -X POST "http://localhost:8090/jobs/<job_id>/rerun" \
-H "X-API-Secret: $API_SHARED_SECRET"
```

Example success response (`202`):

```json
{
"status": "queued",
"source_job_id": "job-old-1",
"job_id": "job-new-1",
"type": "process_docuseal_agreement_job",
"created": true
}
```

### `POST /jobs/resume-extract`

Enqueues one resume extraction job.

- JSON body:
- `contact_id` (string, required)
- `attachment_id` (string, required)
- `filename` (string, required)

Example:

```bash
curl -X POST "http://localhost:8090/jobs/resume-extract" \
-H "X-API-Secret: $API_SHARED_SECRET" \
-H "Content-Type: application/json" \
-d '{
"contact_id": "contact-123",
"attachment_id": "att-456",
"filename": "resume.pdf"
}'
```

### `POST /jobs/resume-apply`

Enqueues one CRM apply job after resume update confirmation.

- JSON body:
- `contact_id` (string, required)
- `updates` (object[string->string], required): CRM field updates.
- `link_discord` (object, optional): `{ "user_id": "...", "username": "..." }`

### `POST /process-contact/{contact_id}`

Manually enqueues one contact skills job.

- Path params:
- `contact_id` (string, required)

### `POST /sync/people`

Manually enqueues a full CRM -> people cache sync.

## Webhooks

### `POST /webhooks/docuseal`
Expand All @@ -8,15 +176,31 @@ Enqueues DocuSeal agreement-signing jobs.

- Job input contract for queueing: `completed_at` is a UTC string using `YYYY-MM-DD HH:mm:ss`.
- Example value: `2026-03-02 10:02:30`.
- Required payload fields:
- `event_type` must be `form.completed`
- `data.email` non-empty signer email
- `data.completed_at` or top-level `timestamp` (ISO timestamp string)
- `data.template.id` must match configured `DOCUSEAL_MEMBER_AGREEMENT_TEMPLATE_ID`

### `POST /webhooks/{source}`

Generic webhook enqueue endpoint.

- Path params:
- `source` (string, required): source label written into job payload.
- JSON body:
- Any JSON object payload.

### `POST /webhooks/espocrm`

EspoCRM webhook endpoint (expects array payload).

- JSON body:
- Array of event objects, each with at least `id` (string).

### `POST /webhooks/espocrm/people-sync`

EspoCRM contact-change webhook for people cache sync.

- JSON body:
- Array of event objects, each with at least `id` (string).
1 change: 1 addition & 0 deletions apps/worker/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ five08 = { workspace = true }
[project.scripts]
backend-api = "five08.backend.api:run"
worker-consumer = "five08.worker.consumer:run"
jobsctl = "five08.jobcli:run"

[tool.alembic]
script_location = "src/five08/worker/migrations"
Expand Down
92 changes: 92 additions & 0 deletions apps/worker/src/five08/backend/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import asyncio
import contextlib
import logging
import os
import secrets
import time
from contextlib import asynccontextmanager
Expand Down Expand Up @@ -72,6 +73,7 @@
)

logger = logging.getLogger(__name__)
_ULID_ALPHABET = "0123456789ABCDEFGHJKMNPQRSTVWXYZ"


class ResumeExtractRequest(BaseModel):
Expand All @@ -97,6 +99,17 @@ class DiscordLinkCreateRequest(BaseModel):
next_path: str | None = None


_JOB_FUNCTIONS: dict[str, Any] = {
process_webhook_event.__name__: process_webhook_event,
process_contact_skills_job.__name__: process_contact_skills_job,
extract_resume_profile_job.__name__: extract_resume_profile_job,
apply_resume_profile_job.__name__: apply_resume_profile_job,
sync_people_from_crm_job.__name__: sync_people_from_crm_job,
sync_person_from_crm_job.__name__: sync_person_from_crm_job,
process_docuseal_agreement_job.__name__: process_docuseal_agreement_job,
}


def _is_authorized(request: Request) -> bool:
"""Validate shared API secret."""
if not settings.api_shared_secret:
Expand All @@ -112,6 +125,21 @@ def _is_authorized(request: Request) -> bool:
return False


def _encode_ulid_base32(value: int, length: int) -> str:
encoded = ["0"] * length
for index in range(length - 1, -1, -1):
encoded[index] = _ULID_ALPHABET[value & 0x1F]
value >>= 5
return "".join(encoded)


def _generate_ulid() -> str:
"""Generate a sortable ULID string without external dependencies."""
timestamp_ms = int(time.time() * 1000)
random_value = int.from_bytes(os.urandom(10), "big")
return f"{_encode_ulid_base32(timestamp_ms, 10)}{_encode_ulid_base32(random_value, 16)}"


def _extract_idempotency_key(value: object) -> str | None:
if isinstance(value, str) and value.strip():
return value.strip()
Expand Down Expand Up @@ -632,6 +660,69 @@ async def job_status_handler(request: Request, job_id: str) -> JSONResponse:
)


async def rerun_job_handler(request: Request, job_id: str) -> JSONResponse:
"""Create and enqueue a new job using a prior job's original call payload."""
if not _is_authorized(request):
return JSONResponse({"error": "unauthorized"}, status_code=401)

normalized_job_id = job_id.strip()
if not normalized_job_id:
return JSONResponse({"error": "job_id_required"}, status_code=400)

source_job = await asyncio.to_thread(get_job, settings, normalized_job_id)
if source_job is None:
return JSONResponse({"error": "job_not_found"}, status_code=404)

fn = _JOB_FUNCTIONS.get(source_job.type)
if fn is None:
return JSONResponse(
{
"error": "unsupported_job_type",
"job_type": source_job.type,
},
status_code=400,
)

raw_payload = source_job.payload if isinstance(source_job.payload, dict) else {}
raw_args = raw_payload.get("args", [])
raw_kwargs = raw_payload.get("kwargs", {})
if not isinstance(raw_args, list) or not isinstance(raw_kwargs, dict):
return JSONResponse({"error": "invalid_job_payload"}, status_code=400)
Comment on lines +686 to +690
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reject malformed source payloads before enqueueing reruns.

Line 686 currently coerces non-dict payloads to {}, and missing args/kwargs fall back to []/{}. That can enqueue a rerun with the wrong call signature instead of duplicating the source job.

✅ Suggested fix
-    raw_payload = source_job.payload if isinstance(source_job.payload, dict) else {}
-    raw_args = raw_payload.get("args", [])
-    raw_kwargs = raw_payload.get("kwargs", {})
-    if not isinstance(raw_args, list) or not isinstance(raw_kwargs, dict):
+    raw_payload = source_job.payload
+    if not isinstance(raw_payload, dict):
+        return JSONResponse({"error": "invalid_job_payload"}, status_code=400)
+
+    if "args" not in raw_payload or "kwargs" not in raw_payload:
+        return JSONResponse({"error": "invalid_job_payload"}, status_code=400)
+
+    raw_args = raw_payload["args"]
+    raw_kwargs = raw_payload["kwargs"]
+    if not isinstance(raw_args, list) or not isinstance(raw_kwargs, dict):
         return JSONResponse({"error": "invalid_job_payload"}, status_code=400)

As per coding guidelines, apps/worker/src/five08/backend/api.py: "Worker ingest endpoints must validate input, persist jobs, enqueue, and return 202 quickly without performing long processing".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/worker/src/five08/backend/api.py` around lines 686 - 690, The current
code coerces non-dict source_job.payload to {} and silently supplies defaults
for missing args/kwargs, which can produce an incorrect rerun; update the
validation to reject malformed payloads: first check that source_job.payload is
a dict and return a 400 JSONResponse if not, then require that payload contains
"args" and "kwargs" keys of types list and dict respectively (use the existing
raw_payload, raw_args, raw_kwargs identifiers) and return
JSONResponse({"error":"invalid_job_payload"}, status_code=400) if those checks
fail so only well-formed jobs are enqueued.


queue = request.app.state.queue
rerun_idempotency_key = f"manual-rerun:{source_job.id}:{_generate_ulid()}"

try:
rerun_job: EnqueuedJob = await asyncio.to_thread(
enqueue_job,
queue=queue,
fn=fn,
args=tuple(raw_args),
kwargs=raw_kwargs,
settings=settings,
idempotency_key=rerun_idempotency_key,
max_attempts=source_job.max_attempts,
)
except Exception:
logger.exception(
"Failed rerunning job source_job_id=%s type=%s",
source_job.id,
source_job.type,
)
return JSONResponse({"error": "enqueue_failed"}, status_code=503)

return JSONResponse(
{
"status": "queued",
"source_job_id": source_job.id,
"job_id": rerun_job.id,
"type": source_job.type,
"created": rerun_job.created,
},
status_code=202,
)


async def sync_people_handler(request: Request) -> JSONResponse:
"""Manual enqueue for a full CRM->people cache sync."""
if not _is_authorized(request):
Expand Down Expand Up @@ -1360,6 +1451,7 @@ def create_app(*, run_lifespan: bool = True) -> FastAPI:
app.add_api_route("/health", health_handler, methods=["GET"])

app.add_api_route("/jobs/{job_id}", job_status_handler, methods=["GET"])
app.add_api_route("/jobs/{job_id}/rerun", rerun_job_handler, methods=["POST"])
app.add_api_route("/jobs/resume-extract", resume_extract_handler, methods=["POST"])
app.add_api_route("/jobs/resume-apply", resume_apply_handler, methods=["POST"])

Expand Down
Loading