Skip to content

feat(ml): propagate pipeline config through NATS pull-mode tasks#1279

Open
mihow wants to merge 3 commits intomainfrom
worktree-job-results-response-cleanup
Open

feat(ml): propagate pipeline config through NATS pull-mode tasks#1279
mihow wants to merge 3 commits intomainfrom
worktree-job-results-response-cleanup

Conversation

@mihow
Copy link
Copy Markdown
Collaborator

@mihow mihow commented Apr 30, 2026

Summary

When Antenna runs an async ML job, workers pull tasks from NATS instead of receiving an HTTP request. Until now, those NATS tasks did not carry the pipeline's configuration — only the sync HTTP path did. Any per-project overrides set via ProjectPipelineConfig were silently ignored by pull-mode workers, which fell back to their own env vars.

This PR adds a config field to the NATS task and result schemas and populates it from pipeline.get_config(project_id=...). It is structural only — no new config parameters are introduced, and the value is not yet read during inference. Using config to drive worker behavior is a follow-up.

Also fixes a pre-existing bug in Pipeline.get_config() that mutated self.default_config in place when a project override was applied, so the in-memory model attribute (and subsequent get_config() calls) leaked the override.

Finally, adds a small audit log: when a worker echoes back the config it actually used, Antenna now compares it against the current pipeline config and logs a warning on drift (e.g. ProjectPipelineConfig was edited mid-job). Persisted audit trails are a future addition.

  • PipelineProcessingTask gains config: PipelineRequestConfigParameters | dict | None (was commented-out TODO)
  • PipelineResultsResponse gains matching config field so workers can echo back what config was actually used
  • queue_images_to_nats calls pipeline.get_config(project_id=job.project.pk) once per job and embeds the result in each task, so per-project ProjectPipelineConfig overrides reach pull-mode workers
  • Pipeline.get_config() now returns a fresh PipelineRequestConfigParameters rather than mutating self.default_config
  • process_nats_pipeline_result warns when echoed config drifts from the pipeline's current configuration
  • Documents the NATS task contract in processing_services/README.md

Test plan

  • New unit test: TestJobView.test_queue_images_to_nats_embeds_pipeline_config — verifies that a ProjectPipelineConfig override propagates into the config field of each queued PipelineProcessingTask
  • New unit test: TestPipeline.test_get_config_does_not_mutate_default_config — resolves project-scoped config first and asserts default_config is untouched
  • New unit tests: test_pipeline_config_drift_logs_warning and test_pipeline_config_match_does_not_warn — verifies the audit log fires on mismatch and stays silent on match
  • Existing tasks endpoint tests unchanged

Follow-ups

A short proposal for moving the per-task config field to a job-level fetch (since all tasks in a job share the same config) lives at docs/claude/planning/pipeline-config-job-level-fetch.md. Worth doing before configs grow to include taxa allow-lists or per-stage hyperparameters.

Companion PR

RolnickLab/ami-data-companion#146 — ADC schema update to receive and echo config

Closes #1275

Summary by CodeRabbit

  • New Features

    • Per-task pipeline configuration is now included with queued async jobs and echoed with processing results.
  • Bug Fixes

    • Pipeline default config is no longer mutated when resolving project overrides.
  • Tests

    • Added unit and E2E-style tests verifying config propagation, echoing, and drift detection behavior.
  • Documentation

    • Expanded async processing docs describing task-level config, expected worker behavior, and drift auditing.

PipelineProcessingTask now carries a `config` field populated from
pipeline.get_config(project_id=...) in queue_images_to_nats. This
allows pull-mode workers (ADC and others) to receive per-project
pipeline configuration rather than relying solely on worker-level
env vars.

PipelineResultsResponse gains a matching `config` field so workers
can echo back the configuration that was actually used.

Closes #1275

Co-Authored-By: Claude <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings April 30, 2026 02:06
@netlify
Copy link
Copy Markdown

netlify Bot commented Apr 30, 2026

Deploy Preview for antenna-ssec canceled.

Name Link
🔨 Latest commit 7141929
🔍 Latest deploy log https://app.netlify.com/projects/antenna-ssec/deploys/69f363cbfc66aa0008258eeb

@netlify
Copy link
Copy Markdown

netlify Bot commented Apr 30, 2026

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit 7141929
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/69f363cb73c7170008fc978b

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 30, 2026

📝 Walkthrough

Walkthrough

Configuration parameters now propagate through NATS pull-mode tasks: PipelineProcessingTask and PipelineResultsResponse gain an optional config field; queuing derives per-job pipeline config (via pipeline.get_config(project_id=...)) and includes it when publishing tasks to NATS workers.

Changes

Cohort / File(s) Summary
Schema Updates
ami/ml/schemas.py
Added optional `config: PipelineRequestConfigParameters
Orchestration Logic
ami/ml/orchestration/jobs.py
queue_images_to_nats now calls pipeline.get_config(project_id=...) and populates each PipelineProcessingTask.config before publishing.
Job Result Handling
ami/jobs/tasks.py
On NATS result processing, compare echoed pipeline_result.config to current persisted pipeline config and emit a warning log when they differ (observation-only; failures are logged but non-fatal).
Pipeline Model
ami/ml/models/pipeline.py
Pipeline.get_config now constructs a PipelineRequestConfigParameters from self.default_config (ensuring a typed copy) before applying project overrides.
Tests
ami/jobs/tests/test_jobs.py, ami/jobs/tests/test_tasks.py, ami/ml/tests.py
Added unit/integration tests: verify config is included when queuing NATS tasks, and that process_nats_pipeline_result emits drift warnings (or not) based on echoed config; ensure get_config does not mutate default_config.
Docs & Planning
processing_services/README.md, docs/claude/planning/pipeline-config-job-level-fetch.md
Documented pull-mode contract: tasks include config (merged from default_config + ProjectPipelineConfig); planning doc explores job-level fetch alternative and migration notes.

Sequence Diagram

sequenceDiagram
    participant Antenna as Antenna<br/>(Job Service)
    participant NATS as NATS<br/>(Message Broker)
    participant Worker as Pull-Mode<br/>Worker

    Antenna->>Antenna: Receive job & pipeline metadata
    Antenna->>Antenna: pipeline.get_config(project_id)
    Antenna->>NATS: Publish PipelineProcessingTask (includes config)
    NATS->>Worker: Worker pulls task
    Worker->>Worker: Read task.config (or fallback)
    Worker->>NATS: Publish PipelineResultsResponse (includes used config)
    NATS->>Antenna: Deliver result
    Antenna->>Antenna: process_nats_pipeline_result -> compare echoed config vs persisted -> log warning if mismatch
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

Suggested labels

ml, backend

Poem

🐰 Hopping through queues with configs snug,

Tasks carry flavors like carrots in a mug.
Workers read, process, and hum a soft tune,
Echo back configs beneath the moon.
Hooray—per-job settings travel soon!

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: propagating pipeline configuration through NATS pull-mode tasks, which is the primary objective of this PR.
Description check ✅ Passed The description comprehensively covers the summary, changes, test plan, and follow-ups, closely matching the template structure with all critical sections populated.
Linked Issues check ✅ Passed The PR fully addresses the requirements from #1275: adds config field to PipelineProcessingTask, populates it via pipeline.get_config(project_id=...), includes tests demonstrating ProjectPipelineConfig propagation, and documents the NATS contract.
Out of Scope Changes check ✅ Passed All changes directly support the objective of propagating pipeline config through NATS tasks. The bug fix to Pipeline.get_config and config drift audit logging are justifiable enhancements that address pre-existing issues and strengthen the feature implementation.
Docstring Coverage ✅ Passed Docstring coverage is 87.50% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch worktree-job-results-response-cleanup

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
Review rate limit: 0/1 reviews remaining, refill in 60 minutes.

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds pipeline config propagation to the async pull-mode (NATS) task/result schemas so per-project ProjectPipelineConfig overrides reach pull-mode workers, aligning async behavior with the existing synchronous /process path.

Changes:

  • Add config to PipelineProcessingTask and PipelineResultsResponse schemas.
  • Embed per-project pipeline config into each queued NATS task in queue_images_to_nats.
  • Add a unit test ensuring queued tasks include project-overridden config; document the pull-mode contract.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

File Description
processing_services/README.md Documents the updated pull-mode task/result contract including the new config field.
ami/ml/schemas.py Extends Pydantic schemas to include config on tasks/results.
ami/ml/orchestration/jobs.py Populates task config from pipeline.get_config(project_id=...) when publishing tasks.
ami/jobs/tests/test_jobs.py Adds a unit test asserting project pipeline config overrides are embedded in queued tasks.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 89 to 107
@@ -101,6 +103,7 @@ def queue_images_to_nats(job: "Job", images: list[SourceImage]):
id=image_id,
image_id=image_id,
image_url=image_url,
config=pipeline_config,
)
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipeline_config is a mutable dict and the same object is being attached to every PipelineProcessingTask. Because Pipeline.get_config() returns (and mutates) the underlying default_config dict, this means all queued tasks share a single config instance, which can lead to cross-task side effects if anything mutates it. Prefer passing an immutable snapshot (e.g., a shallow/deep copy) per task, or otherwise ensure the config is not shared/mutated after task creation.

Copilot uses AI. Check for mistakes.
Comment on lines +95 to +97
`config` mirrors `PipelineRequest.config` from the synchronous HTTP path. It is derived from the pipeline's `default_config` merged with any per-project `ProjectPipelineConfig` override. It may be `null` if no config is set.

Workers should read `config` from each task and apply it to their processing. If `config` is absent or null, fall back to worker-level defaults (e.g. environment variables).
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The contract says config “may be null if no config is set”, but with the current Antenna code path pipeline.get_config(...) always returns a dict (possibly empty) when a pipeline exists; tasks are not normally queued without a pipeline. Consider updating this to reflect actual behavior (empty object vs null), or clarify the specific scenario where null is expected.

Suggested change
`config` mirrors `PipelineRequest.config` from the synchronous HTTP path. It is derived from the pipeline's `default_config` merged with any per-project `ProjectPipelineConfig` override. It may be `null` if no config is set.
Workers should read `config` from each task and apply it to their processing. If `config` is absent or null, fall back to worker-level defaults (e.g. environment variables).
`config` mirrors `PipelineRequest.config` from the synchronous HTTP path. It is derived from the pipeline's `default_config` merged with any per-project `ProjectPipelineConfig` override. In the normal Antenna code path, it is an object; if no defaults or overrides are set, it may be an empty object.
Workers should read `config` from each task and apply it to their processing. If `config` is unexpectedly absent or `null` (for example, due to a malformed or legacy payload), fall back to worker-level defaults (e.g. environment variables).

Copilot uses AI. Check for mistakes.
Comment thread processing_services/README.md Outdated

Workers should read `config` from each task and apply it to their processing. If `config` is absent or null, fall back to worker-level defaults (e.g. environment variables).

When returning results, populate `PipelineResultsResponse.config` with the config that was actually used, so Antenna can record it for auditing.
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doc implies Antenna will “record [results].config for auditing”, but the backend currently doesn’t appear to persist or otherwise use PipelineResultsResponse.config when saving results (it’s parsed but ignored). Either implement recording on the Antenna side, or soften/reword this guidance so workers don’t expect the value to be stored/visible.

Suggested change
When returning results, populate `PipelineResultsResponse.config` with the config that was actually used, so Antenna can record it for auditing.
When returning results, you may populate `PipelineResultsResponse.config` with the config that was actually used so your worker reports its effective settings consistently. Do not assume Antenna currently persists, displays, or audits this value when saving results.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@ami/jobs/tests/test_jobs.py`:
- Around line 578-584: The test currently only asserts an overridden key
survives the merge and misses verifying that default-only keys are preserved;
modify the test around PipelineRequestConfigParameters usage (the
pipeline.default_config assignment and ProjectPipelineConfig.update call) to add
a second key present only in defaults (e.g., add "default_only": "value" to
PipelineRequestConfigParameters) and then assert that task.config contains both
the overridden key ("example_param" with "project_override") and the
default-only key; ensure the update still only overrides the intended key via
ProjectPipelineConfig.objects.filter(...).update(...) and that the assertion
checks task.config (from whatever function creates the Task) for presence of the
default-only key.

In `@processing_services/README.md`:
- Around line 99-100: Update the README to note that the worker examples must be
updated to include the new PipelineResultsResponse.config field; specifically,
mention that the minimal and example worker schema definitions (module
minimal.api.schemas) and the pipelines example code that builds responses
(module example.api.pipelines) need a companion schema update so they populate
PipelineResultsResponse.config with the actual runtime config used when
returning results.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 9c37f3cb-90f4-4eef-910f-3cd002c36a3d

📥 Commits

Reviewing files that changed from the base of the PR and between a347d55 and 9eb923e.

📒 Files selected for processing (4)
  • ami/jobs/tests/test_jobs.py
  • ami/ml/orchestration/jobs.py
  • ami/ml/schemas.py
  • processing_services/README.md

Comment on lines +578 to +584
pipeline.default_config = PipelineRequestConfigParameters({"example_param": "default"})
pipeline.save()
# _create_pipeline already called pipeline.projects.add(self.project) which
# created a ProjectPipelineConfig row; update it rather than creating a duplicate.
ProjectPipelineConfig.objects.filter(project=self.project, pipeline=pipeline).update(
config={"example_param": "project_override"}
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Test does not verify default-key preservation in merge.

Line 578 and Line 583 only exercise an overridden key, so Line 619 cannot catch regressions where defaults are dropped instead of merged. Add one default-only key and assert it survives in task.config.

Suggested test tightening
-        pipeline.default_config = PipelineRequestConfigParameters({"example_param": "default"})
+        pipeline.default_config = PipelineRequestConfigParameters(
+            {"example_param": "default", "default_only_param": 7}
+        )
@@
         self.assertIsNotNone(task.config)
         self.assertEqual(task.config.get("example_param"), "project_override")
+        self.assertEqual(task.config.get("default_only_param"), 7)

Also applies to: 619-619

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/tests/test_jobs.py` around lines 578 - 584, The test currently only
asserts an overridden key survives the merge and misses verifying that
default-only keys are preserved; modify the test around
PipelineRequestConfigParameters usage (the pipeline.default_config assignment
and ProjectPipelineConfig.update call) to add a second key present only in
defaults (e.g., add "default_only": "value" to PipelineRequestConfigParameters)
and then assert that task.config contains both the overridden key
("example_param" with "project_override") and the default-only key; ensure the
update still only overrides the intended key via
ProjectPipelineConfig.objects.filter(...).update(...) and that the assertion
checks task.config (from whatever function creates the Task) for presence of the
default-only key.

Comment thread processing_services/README.md Outdated
Comment on lines +99 to +100
When returning results, populate `PipelineResultsResponse.config` with the config that was actually used, so Antenna can record it for auditing.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

README contract is ahead of in-repo example/minimal implementations.

This says workers should populate PipelineResultsResponse.config, but current in-repo examples still omit it (processing_services/minimal/api/schemas.py:213-244, processing_services/example/api/pipelines.py:117-140). Please add a short note here that those examples need the companion schema update.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@processing_services/README.md` around lines 99 - 100, Update the README to
note that the worker examples must be updated to include the new
PipelineResultsResponse.config field; specifically, mention that the minimal and
example worker schema definitions (module minimal.api.schemas) and the pipelines
example code that builds responses (module example.api.pipelines) need a
companion schema update so they populate PipelineResultsResponse.config with the
actual runtime config used when returning results.

mihow and others added 2 commits April 29, 2026 19:15
…nfig

Previously `config = self.default_config; config.update(...)` mutated the
in-memory default_config, leaking project-scoped overrides into subsequent
get_config() calls (and into other request paths reading default_config).

Wrap with PipelineRequestConfigParameters(self.default_config or {}) so
each call gets a fresh dict.

Adds regression test that resolves project-scoped config first, then
asserts default_config is untouched.

Co-Authored-By: Claude <noreply@anthropic.com>
…a config

After a worker echoes back the pipeline config it actually used (in
PipelineResultsResponse.config), compare it against what Antenna would
resolve today via Pipeline.get_config(project_id=...). Log a warning
on mismatch — drift is not enforced, just observable.

Surfaces the case where ProjectPipelineConfig was edited mid-job, or a
worker is running stale config. Persisted audit trails are a future
addition; the README is updated to reflect "log" rather than "record."

Adds two regression tests in ami.jobs.tests.test_tasks for the drift
warning and the matching-config no-warn path.

Also drops a small proposal at docs/claude/planning/pipeline-config-job-level-fetch.md
for moving the per-task config field to a job-level fetch endpoint, since
all tasks in a job share the same config.

Co-Authored-By: Claude <noreply@anthropic.com>
mihow added a commit to RolnickLab/ami-data-companion that referenced this pull request Apr 30, 2026
… generic dict

AntennaPipelineProcessingTask.config and _process_batch's pipeline_config
arg are now typed as PipelineConfigRequest | None rather than dict | None.
Pydantic parses the incoming config into the typed model on receive, so
unknown keys are dropped at the boundary and known fields are validated.

If we want more config parameters, update PipelineConfigRequest in
trapdata/api/schemas.py — strict on the wire, not stringly-typed.

Worker's response construction simplifies from PipelineConfigRequest(**(d or {}))
to (d or PipelineConfigRequest()) since d is already a typed instance.

Companion to RolnickLab/antenna#1279.

Co-Authored-By: Claude <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@ami/jobs/tasks.py`:
- Around line 314-317: The warning in tasks.py currently logs full pipeline
configs via job.logger.warning using echoed_config and current_config for
job_id, which can leak sensitive data and create large logs; change the logging
to compute and log a compact summary instead — for example produce a diff that
lists only differing top-level keys and counts (or a truncated hash/summary)
between echoed_config and current_config and include job_id and the
differing-keys/count in the message; update the call site that uses
echoed_config/current_config so it no longer interpolates full payloads but logs
the compact summary (or hashes) and ensure the function or helper you add to
compute the summary is used wherever pipeline drift is reported.

In `@ami/jobs/tests/test_tasks.py`:
- Around line 532-577: Both tests are failing due to save_results hitting a
missing-detector path; before invoking process_nats_pipeline_result in
test_pipeline_config_drift_warns and test_pipeline_config_match_does_not_warn,
attach a minimal detector to the fixture pipeline so save_results can proceed
normally. Create and save a simple Detector (or add an existing minimal detector
object) and associate it with self.pipeline (e.g., via
pipeline.detectors.add(...) or the project’s detector creation helper), ensure
it is persisted, then call process_nats_pipeline_result as before so the tests
exercise only the config-drift logic.

In `@docs/claude/planning/pipeline-config-job-level-fetch.md`:
- Around line 24-27: The fenced code block containing "GET
/api/v2/jobs/{job_id}/" is missing a language tag which triggers markdownlint
MD040; update the fence for that block (the snippet showing GET
/api/v2/jobs/{job_id}/) to include a language such as "http" (i.e., change the
opening ``` to ```http) so the block is properly annotated.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: e1104f0e-be88-42ee-85e6-4b74f39b615a

📥 Commits

Reviewing files that changed from the base of the PR and between 9eb923e and 7141929.

📒 Files selected for processing (6)
  • ami/jobs/tasks.py
  • ami/jobs/tests/test_tasks.py
  • ami/ml/models/pipeline.py
  • ami/ml/tests.py
  • docs/claude/planning/pipeline-config-job-level-fetch.md
  • processing_services/README.md
✅ Files skipped from review due to trivial changes (1)
  • processing_services/README.md

Comment thread ami/jobs/tasks.py
Comment on lines +314 to +317
job.logger.warning(
f"Pipeline config drift on job {job_id}: worker used {echoed_config}, "
f"current Antenna config is {current_config}"
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Avoid logging full pipeline config payloads in drift warnings.

This logs complete worker/current config values on every drift event. In practice that can expose sensitive fields and create very large per-job logs. Log a compact diff summary (e.g., differing keys/count) instead of full payloads.

Proposed fix
-                if echoed_config != current_config:
-                    job.logger.warning(
-                        f"Pipeline config drift on job {job_id}: worker used {echoed_config}, "
-                        f"current Antenna config is {current_config}"
-                    )
+                if echoed_config != current_config:
+                    differing_keys = sorted(
+                        key
+                        for key in (set(echoed_config) | set(current_config))
+                        if echoed_config.get(key) != current_config.get(key)
+                    )
+                    job.logger.warning(
+                        "Pipeline config drift on job %s: differing_keys=%s",
+                        job_id,
+                        differing_keys,
+                    )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/tasks.py` around lines 314 - 317, The warning in tasks.py currently
logs full pipeline configs via job.logger.warning using echoed_config and
current_config for job_id, which can leak sensitive data and create large logs;
change the logging to compute and log a compact summary instead — for example
produce a diff that lists only differing top-level keys and counts (or a
truncated hash/summary) between echoed_config and current_config and include
job_id and the differing-keys/count in the message; update the call site that
uses echoed_config/current_config so it no longer interpolates full payloads but
logs the compact summary (or hashes) and ensure the function or helper you add
to compute the summary is used wherever pipeline drift is reported.

Comment on lines +532 to +577
success_data = PipelineResultsResponse(
pipeline="test-pipeline",
algorithms={},
total_time=1.0,
source_images=[SourceImageResponse(id=str(self.images[0].pk), url="http://example.com/test_image_0.jpg")],
detections=[],
errors=None,
config={"example_config_param": 99},
).dict()

with self.assertLogs(level="WARNING") as cm:
process_nats_pipeline_result.apply(
kwargs={"job_id": self.job.pk, "result_data": success_data, "reply_subject": "reply.drift"}
)

self.assertTrue(
any("Pipeline config drift" in msg for msg in cm.output),
f"Expected drift warning in logs, got: {cm.output}",
)

@patch("ami.jobs.tasks.TaskQueueManager")
def test_pipeline_config_match_does_not_warn(self, mock_manager_class):
"""When echoed config matches current pipeline config, no drift warning is logged."""
from ami.ml.schemas import PipelineRequestConfigParameters

self._setup_mock_nats(mock_manager_class)

self.pipeline.default_config = PipelineRequestConfigParameters({"example_config_param": 5})
self.pipeline.save()

success_data = PipelineResultsResponse(
pipeline="test-pipeline",
algorithms={},
total_time=1.0,
source_images=[SourceImageResponse(id=str(self.images[0].pk), url="http://example.com/test_image_0.jpg")],
detections=[],
errors=None,
config={"example_config_param": 5},
).dict()

# assertLogs requires at least one log; capture INFO so the test doesn't
# spuriously fail when no WARNING is emitted (the assertion below).
with self.assertLogs(level="INFO") as cm:
process_nats_pipeline_result.apply(
kwargs={"job_id": self.job.pk, "result_data": success_data, "reply_subject": "reply.match"}
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Drift tests currently run through a failing save path (missing detector).

These two tests construct a “successful” result but the fixture pipeline has no detection algorithm, so save_results can error independently of drift behavior. Add a minimal detector to keep the test on a clean happy path.

Proposed fix
     def test_pipeline_config_drift_logs_warning(self, mock_manager_class):
@@
-        self._setup_mock_nats(mock_manager_class)
+        mock_manager = self._setup_mock_nats(mock_manager_class)
@@
         self.pipeline.default_config = PipelineRequestConfigParameters({"example_config_param": 5})
         self.pipeline.save()
+        self.pipeline.algorithms.add(
+            Algorithm.objects.create(
+                name="drift-detector",
+                key="drift-detector",
+                task_type=AlgorithmTaskType.LOCALIZATION,
+            )
+        )
@@
         with self.assertLogs(level="WARNING") as cm:
             process_nats_pipeline_result.apply(
                 kwargs={"job_id": self.job.pk, "result_data": success_data, "reply_subject": "reply.drift"}
             )
+        mock_manager.acknowledge_task.assert_called_once_with("reply.drift")
@@
     def test_pipeline_config_match_does_not_warn(self, mock_manager_class):
@@
-        self._setup_mock_nats(mock_manager_class)
+        mock_manager = self._setup_mock_nats(mock_manager_class)
@@
         self.pipeline.default_config = PipelineRequestConfigParameters({"example_config_param": 5})
         self.pipeline.save()
+        self.pipeline.algorithms.add(
+            Algorithm.objects.create(
+                name="match-detector",
+                key="match-detector",
+                task_type=AlgorithmTaskType.LOCALIZATION,
+            )
+        )
@@
         with self.assertLogs(level="INFO") as cm:
             process_nats_pipeline_result.apply(
                 kwargs={"job_id": self.job.pk, "result_data": success_data, "reply_subject": "reply.match"}
             )
+        mock_manager.acknowledge_task.assert_called_once_with("reply.match")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/tests/test_tasks.py` around lines 532 - 577, Both tests are failing
due to save_results hitting a missing-detector path; before invoking
process_nats_pipeline_result in test_pipeline_config_drift_warns and
test_pipeline_config_match_does_not_warn, attach a minimal detector to the
fixture pipeline so save_results can proceed normally. Create and save a simple
Detector (or add an existing minimal detector object) and associate it with
self.pipeline (e.g., via pipeline.detectors.add(...) or the project’s detector
creation helper), ensure it is persisted, then call process_nats_pipeline_result
as before so the tests exercise only the config-drift logic.

Comment on lines +24 to +27
```
GET /api/v2/jobs/{job_id}/
```

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Add a language to the fenced code block.

This block is missing a fence language and will trip markdownlint MD040 in repos where docs linting is enforced.

Proposed fix
-```
+```http
 GET /api/v2/jobs/{job_id}/
</details>

<!-- suggestion_start -->

<details>
<summary>📝 Committable suggestion</summary>

> ‼️ **IMPORTANT**
> Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

```suggestion

🧰 Tools
🪛 markdownlint-cli2 (0.22.1)

[warning] 24-24: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/claude/planning/pipeline-config-job-level-fetch.md` around lines 24 -
27, The fenced code block containing "GET /api/v2/jobs/{job_id}/" is missing a
language tag which triggers markdownlint MD040; update the fence for that block
(the snippet showing GET /api/v2/jobs/{job_id}/) to include a language such as
"http" (i.e., change the opening ``` to ```http) so the block is properly
annotated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Propagate PipelineRequestConfigParameters through pull-mode (NATS) tasks

2 participants