feat(jobs): show who requests which tasks in ML processing job logs#1238
feat(jobs): show who requests which tasks in ML processing job logs#1238
Conversation
Async_api jobs have a silence window between "Created NATS stream" and the first "Updated job progress" line where Django only logs results coming back. Operators viewing the per-job log can't tell if a worker is working, missing, slow, or hitting a wall. Mirror the task-fetch and result-POST paths to job.logger so the per-job log view reflects worker polling activity: - Successful fetch: log requested/delivered batch sizes and caller info. - Early-exit on terminal-status jobs: makes the "phantom-pull" pattern (workers polling against jobs whose NATS stream still exists after terminal state) visible from the per-job log. - NATS-unavailable 503: mirror the existing module-logger warning. - Result POST: mirror the existing "Queued pipeline result" line with user/token_id added. Module-logger copy stays for ops monitoring. Co-Authored-By: Claude <noreply@anthropic.com>
Operators viewing the per-job log have to compute throughput by hand from
started_at + processed/total to tell stalled-vs-slow vs healthy-but-throttled
jobs apart. Add a rolling rate line emitted from _update_job_progress on
every process/results stage update:
Job N throughput: elapsed=Xm YYs, processed=P/T, rate=R imgs/min, ETA=...
Intentionally a plain division over total elapsed, not a rolling-window
forecast — cheap to compute, accurate enough to spot a stall, and easy to
read from one line. Skipped defensively when started_at is None, elapsed
<3s, or the stage is not process/results. Emits rate=0.0 / ETA=unknown
when processed_count is still zero.
Co-Authored-By: Claude <noreply@anthropic.com>
✅ Deploy Preview for antenna-preview canceled.
|
✅ Deploy Preview for antenna-ssec canceled.
|
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 59 minutes and 58 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
📝 WalkthroughWalkthroughAdded helper functions to compute and emit job throughput metrics and estimated completion time. Integrated per-job logging into the task and result endpoints to track processing rates and user/token context. Added comprehensive test coverage for throughput calculations and endpoint logging behavior. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
Adds per-job observability log lines for async_api ML jobs so worker polling, result posting, and throughput/ETA are visible in GET /api/v2/jobs/{id}/ → logs.stdout.
Changes:
- Log task-fetch activity (including terminal-job early exit) to the per-job logger in
JobViewSet.tasks(). - Mirror result-POST “Queued pipeline result …” to the per-job logger in
JobViewSet.result(). - Add
_log_job_throughputto emit throughput/ETA lines onprocess/resultsprogress updates.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
ami/jobs/views.py |
Adds per-job logging for task fetches, terminal-job early exit, and result-POST queuing. |
ami/jobs/tasks.py |
Adds throughput/ETA computation and logs it during progress updates. |
ami/jobs/tests/test_jobs.py |
Adds unit tests asserting the new per-job log lines and throughput behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
🧹 Nitpick comments (1)
ami/jobs/views.py (1)
270-271: Extract actor-context building into a shared helper to avoid drift.The same
user_desc/token_idlogic appears in both actions. Centralizing it will keep formatting consistent and make future adjustments safer.♻️ Proposed refactor
class JobViewSet(DefaultViewSet, ProjectMixin): + `@staticmethod` + def _actor_log_context(request) -> tuple[str, int | None]: + user_desc = getattr(request.user, "email", None) + if not user_desc: + user_pk = getattr(request.user, "pk", None) + user_desc = f"user_id={user_pk}" if user_pk is not None else "unknown" + token_id = getattr(request.auth, "pk", None) + return user_desc, token_id + `@action`(detail=True, methods=["post"], name="tasks") def tasks(self, request, pk=None): @@ - user_desc = getattr(request.user, "email", None) or str(request.user) - token_id = getattr(request.auth, "pk", None) + user_desc, token_id = self._actor_log_context(request) @@ `@action`(detail=True, methods=["post"], name="result") def result(self, request, pk=None): @@ - user_desc = getattr(request.user, "email", None) or str(request.user) - token_id = getattr(request.auth, "pk", None) + user_desc, token_id = self._actor_log_context(request)Also applies to: 330-331
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/jobs/views.py` around lines 270 - 271, Extract the repeated logic that builds user_desc and token_id into a single helper (e.g., get_actor_context(request) or build_actor_context(request)) that returns a dict or tuple with user_desc and token_id computed via getattr(request.user, "email", None) or str(request.user) and getattr(request.auth, "pk", None); replace the duplicated lines in the two action handlers (the blocks that currently set user_desc = ... and token_id = ...) to call this helper and use its return value, and add a small unit/test or docstring so future changes are made in one place.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@ami/jobs/views.py`:
- Around line 270-271: Extract the repeated logic that builds user_desc and
token_id into a single helper (e.g., get_actor_context(request) or
build_actor_context(request)) that returns a dict or tuple with user_desc and
token_id computed via getattr(request.user, "email", None) or str(request.user)
and getattr(request.auth, "pk", None); replace the duplicated lines in the two
action handlers (the blocks that currently set user_desc = ... and token_id =
...) to call this helper and use its return value, and add a small unit/test or
docstring so future changes are made in one place.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: eea1b9e3-3475-42aa-b412-51e7354759e4
📒 Files selected for processing (3)
ami/jobs/tasks.pyami/jobs/tests/test_jobs.pyami/jobs/views.py
- Fix 1 (token leak): introduce _actor_log_context() helper that truncates Token.pk to an 8-char fingerprint before writing to job.logs.stdout. DRF Token.pk IS the 40-char bearer secret, so logging the full value exposed credentials to every project member. - Fix 2 (noisy empty polls): tasks() now logs at DEBUG when delivered==0 so zero-task polls stay out of the operator-facing per-job log feed; INFO is kept for delivered>0. Three unit tests cover the split. - Fix 3 (throughput log inside atomic): wrap _log_job_throughput() in try/except so a logging failure does not roll back the progress update. Warning is emitted on the module logger to avoid recursion. Co-Authored-By: Claude <noreply@anthropic.com>
Fires on every async_api task_postrun — always true under normal operation (Celery task ends when images are queued; async workers drive the actual stages afterward). A genuinely stuck async job shows up as absence of progress/throughput lines, not this message. Same rationale as the delivered=0 → DEBUG move earlier in this PR. Co-Authored-By: Claude <noreply@anthropic.com>
What this does (plain English)
While an ML job is running, the job's log view was mostly empty during the long stretches where workers were actually doing the work. This PR fills that in: every worker poll, every result POST back from a worker, and a progress-rate line all show up in the job log now — along with which user and API token drove each request. Makes it much easier to tell at a glance whether a job is healthy, slow, or stuck, and who's doing what.
Summary
Adds structured observability log lines to the per-job logger for
async_apiML jobs, so worker activity is visible from the job log view (GET /api/v2/jobs/{id}/→logs.stdout).Three additions in
ami/jobs/views.pyandami/jobs/tasks.py:JobViewSet.tasks()):Tasks fetched: requested=..., delivered=..., user=..., token_id=...on every worker poll. Fills in the silence window betweenCreated NATS streamand the firstUpdated job progressline — previously the job log showed nothing during this stretch even though workers were actively polling.JobViewSet.result()):Queued pipeline result: task_id=..., reply_subject=..., user=...mirrors the existing module-logger emission to the per-job log. Module-logger copy is retained for ops-level monitoring._log_job_throughputinami/jobs/tasks.py):Job {id} throughput: elapsed=..., processed=P/T, rate=R imgs/min, ETA=...onprocess/resultsstage progress updates. Intentionally a plainprocessed/elapseddivision rather than a rolling-window estimator — accurate enough to distinguish stalled-vs-slow from healthy-but-throttled, cheap, easy to eyeball. Guarded bystarted_at != Noneand skipped on non-process/results stages.POST /tasks/against a terminal job now logsTasks requested for non-active job (status=...); returning empty.to the per-job log instead of silently returning{"tasks": []}. Restores the signal used to detect workers still polling terminal jobs.Test plan
ami.jobs.tests.test_jobs— unit tests cover all four log lines (65/65 pass)async_apijob) — all four signals observed in the livelogs.stdoutview; throughput line ETAs monotonically decrease and match DB countsSummary by CodeRabbit
New Features
Tests
E2E validation
Ran against a local compose stack with one
async_apiML job: 50-image random collection, pipelinequebec_vermont_moths_2023, ADC worker. Job completed successfully in ~4 minutes (50 images / 483 detections). Pulled the resultingjob.logs.stdoutviaGET /api/v2/jobs/{id}/and grep-counted each of the four new log lines. User/token identifiers redacted below.Tasks fetched — 68 lines.
deliveredvalue correctly tracks the silence window (8×delivered=8during initial pulls, 2×delivered=2partial, 58×delivered=0during the batch-processing stretch when no work is outstanding):Queued pipeline result — 84 per-job lines observed; module-logger copy landed 100× in
djangocontainer logs (both emissions sit in the sameforloop). The per-job log is ~16/100 below the module log — likely a Job-row save race when multiple/result/POSTs land concurrently (job.logger.infoappends to a JSONB list on the Job row, and overlapping writes can lose entries). Not a correctness issue for the feature, but worth filing as a follow-up; the module-logger copy was retained exactly so ops doesn't rely solely on the per-job view.Throughput / ETA — 64 lines, all on
process/resultsstages (none oncollect, confirming the stage filter). First line at elapsed=2m 11s (confirming thestarted_atguard).processedmatchesSourceImageCollection.source_images_count(50). ETA shrinks monotonically once processing ramps up:Phantom-pull early-exit — after the job hit
SUCCESS, posting an extra/tasks/call returned{"tasks":[]}and wrote the expected line tologs.stdout: