Skip to content

Google: Fix CloudComposer Dag-run empty-window success#67052

Open
Vamsi-klu wants to merge 2 commits into
apache:mainfrom
Vamsi-klu:fix/57512-composer-dag-run-sensor-empty-window
Open

Google: Fix CloudComposer Dag-run empty-window success#67052
Vamsi-klu wants to merge 2 commits into
apache:mainfrom
Vamsi-klu:fix/57512-composer-dag-run-sensor-empty-window

Conversation

@Vamsi-klu
Copy link
Copy Markdown
Contributor

@Vamsi-klu Vamsi-klu commented May 16, 2026

Fix CloudComposerDAGRunSensor and CloudComposerDAGRunTrigger so they only succeed when at least one Dag run falls inside the requested execution window and every in-window run is in an allowed state.

Why this change is needed:

  • the current window-checking helper returns success when no in-window Dag runs are found, as long as it also does not see an in-window disallowed state
  • that means unrelated out-of-window Dag runs can make the sensor succeed even though no relevant Dag run exists for the requested window
  • the same behavior exists in both the sync sensor and the deferrable trigger, so both paths need to be corrected together

What changes in behavior:

  • if all returned Dag runs are outside the execution window, the sensor now waits instead of succeeding
  • if at least one Dag run is inside the window, success still requires all in-window runs to be in an allowed state
  • composer_dag_run_id behavior is unchanged
  • open-interval boundary behavior is preserved

What is covered:

  • sync sensor unit tests for out-of-window, mixed-window, rejected in-window, and boundary-only cases
  • deferrable trigger unit tests for continued polling on out-of-window results and success on an in-window allowed run
  • additional sensor + trigger tests for the composer_dag_run_id branch (template_fields, Jinja rendering, range-vs-id precedence, parse-time source guard, trigger branch precedence, trigger polling)
  • system test example restructured to pin each CloudComposerDAGRunSensor to the dag_run_id minted by an upstream CloudComposerTriggerDAGRunOperator (via XCom on the newly templated composer_dag_run_id field). The example no longer relies on execution_range, which was previously evaluated at parse time before the Composer env existed -- the failure mode that caused PR Fixed CloudComposerDAGRunSensor to return False when no runs exist in execution_range #61046 to be reverted
  • standalone simulation harness (dev/simulate_composer_system_test.py) that mocks all GCP surfaces and drives the example via dag.test() end-to-end, including the deferred sensor's trigger inline, so contributors without GCP credentials can reproduce the proof

Follow-up:

closes: #57512


Was generative AI tooling used to co-author this PR?
  • Yes — Codex (GPT-5) and Claude Code (Opus 4.7 1M)

Generated-by: Codex (GPT-5) and Claude Code (Opus 4.7 1M) following the guidelines

@Vamsi-klu Vamsi-klu requested a review from shahar1 as a code owner May 16, 2026 21:41
@boring-cyborg boring-cyborg Bot added area:providers provider:google Google (including GCP) related issues labels May 16, 2026
Copy link
Copy Markdown
Contributor

@shahar1 shahar1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please demonstrate that system tests pass when the proposed changes are included?
See past discussion here: #61046

Vamsi-klu added a commit to Vamsi-klu/airflow that referenced this pull request May 17, 2026
Reorder the example Dag so each CloudComposerTriggerDAGRunOperator runs
before its sibling CloudComposerDAGRunSensor, and have the sensors pull
the freshly minted run id from XCom via the new templated
composer_dag_run_id field. The previous "execution_range=[now -1d, now]"
form was evaluated at Dag-parse time -- before the Composer environment
existed -- so the empty-window fix from PR apache#67052 would have caused the
sensor to wait forever, the same way it did when PR apache#61046 was reverted.

Adds a defensive timeout on the Dag-run sensors so any future regression
back into the windowed code path fails CI fast instead of hanging the
worker. Leaves the external-task sensors on the legacy windowed path
deliberately; the analogous bug in _check_task_instances_states is
tracked at apache#67051 and the inline
note flags that the example must be restructured at the same time when
that issue lands.

Adds unit coverage for: the new template_fields entry, Jinja rendering
of an xcom_pull template, run-id-path success/wait/precedence in poke(),
parse-time source guard against re-introducing datetime.now(), and the
matching trigger paths (run-id branch precedence + polling). Ships a
local simulation harness (dev/simulate_composer_system_test.py) that
mocks all GCP surfaces and drives the example via dag.test() end-to-end
so contributors without GCP credentials can reproduce the proof.
@Vamsi-klu
Copy link
Copy Markdown
Contributor Author

Hi @shahar1 -- addressed the system-test concern.

Why PR #61046 had to be reverted (revert commit 14995f1):
the example called CloudComposerDAGRunSensor without execution_range, so the sensor inferred the window [logical_date - 1d, logical_date]. The Composer environment was created mid-DAG-run, which meant no airflow_monitoring runs ever fell inside that past window. The buggy code returned True (false positive); the empty-window fix correctly returns False and the example then hung forever.

The example patch in the earlier revision of this PR tried to side-step the issue with execution_range=[datetime.now() - timedelta(1), datetime.now()], but that's evaluated at DAG-parse time -- roughly half an hour before the sensor pokes and before the Composer env exists. Same hang mode, just hidden.

What this commit changes:

Verification I can run locally without GCP credentials:

  • six new sensor unit tests + two new trigger unit tests covering the composer_dag_run_id branch (template_fields, Jinja render, poke success/wait, range-vs-id precedence, parse-time source guard, trigger branch precedence, trigger polling) -- all 75 tests in providers/google/tests/unit/google/cloud/{sensors,triggers}/test_cloud_composer.py pass.
  • dev/simulate_composer_system_test.py, a standalone harness that loads the example via importlib, mocks every GCP surface the example touches (googleapiclient + CloudComposerHook + CloudComposerAsyncHook + Environment / ImageVersion / ExecuteAirflowCommandResponse.to_dict), runs dag.test() end-to-end, and reports per-task state. The deferred trigger runs inline under dag.test(), so the deferrable sensor path is exercised too.

Simulation result:

$ uv run --project providers/google python dev/simulate_composer_system_test.py
loading example dag from .../example_cloud_composer.py
registering dag bundle and serializing dag
running dag.test() on composer
OK: dag.test() completed for composer -- final state: success
  create_env                       -> success
  dag_run_sensor                   -> success
  defer_create_env                 -> success
  defer_dag_run_sensor             -> success
  defer_delete_env                 -> success
  defer_external_task_sensor       -> success
  defer_run_airflow_cli_cmd        -> success
  defer_trigger_dag_run            -> success
  defer_update_env                 -> success
  delete_env                       -> success
  external_task_sensor             -> success
  get_env                          -> success
  get_project_number               -> success
  image_versions                   -> success
  list_envs                        -> success
  run_airflow_cli_cmd              -> success
  trigger_dag_run                  -> success
  update_env                       -> success
  watcher                          -> skipped

Both the sync dag_run_sensor and the deferred defer_dag_run_sensor matched on the trigger ops' run id via the composer_dag_run_id branch.

@VladaZakharova @MaksYermak -- would appreciate a confirmation pass against real Composer infra when convenient, since the simulation can only validate the control flow, not the actual REST API contracts.


Drafted-by: Claude Code (Opus 4.7 1M); reviewed by @Vamsi-klu before posting

@potiuk potiuk added the ready for maintainer review Set after triaging when all criteria pass. label May 18, 2026
Vamsi-klu added 2 commits May 21, 2026 08:41
Reorder the example Dag so each CloudComposerTriggerDAGRunOperator runs
before its sibling CloudComposerDAGRunSensor, and have the sensors pull
the freshly minted run id from XCom via the new templated
composer_dag_run_id field. The previous "execution_range=[now -1d, now]"
form was evaluated at Dag-parse time -- before the Composer environment
existed -- so the empty-window fix from PR apache#67052 would have caused the
sensor to wait forever, the same way it did when PR apache#61046 was reverted.

Adds a defensive timeout on the Dag-run sensors so any future regression
back into the windowed code path fails CI fast instead of hanging the
worker. Leaves the external-task sensors on the legacy windowed path
deliberately; the analogous bug in _check_task_instances_states is
tracked at apache#67051 and the inline
note flags that the example must be restructured at the same time when
that issue lands.

Adds unit coverage for: the new template_fields entry, Jinja rendering
of an xcom_pull template, run-id-path success/wait/precedence in poke(),
parse-time source guard against re-introducing datetime.now(), and the
matching trigger paths (run-id branch precedence + polling). Ships a
local simulation harness (dev/simulate_composer_system_test.py) that
mocks all GCP surfaces and drives the example via dag.test() end-to-end
so contributors without GCP credentials can reproduce the proof.
@nnguyen168
Copy link
Copy Markdown

Hi @shahar1 @Vamsi-klu I ran the unit tests and local simulation for this PR.

Environment:

  • macOS (Darwin)
  • Python 3.12.11
  • uv 0.11.16

Unit Test Results:

  • test_cloud_composer.py (sensors): 59/59 passed
  • test_cloud_composer.py (triggers): 16/16 passed

Local Simulation (dev/simulate_composer_system_test.py):

  • Final state: success
  • Both sync dag_run_sensor and deferred defer_dag_run_sensor matched via composer_dag_run_id branch
  • All 18 tasks completed successfully (watcher skipped as expected)

Note: I don't have GCP credentials to run the actual system tests against real Composer infrastructure. The unit tests and local simulation validate the control flow, but real GCP validation is still needed.

@nnguyen168
Copy link
Copy Markdown

  OK: dag.test() completed for composer -- final state: success
  the sync `dag_run_sensor` and deferred `defer_dag_run_sensor` both 
  matched on dag_run_id='manual__simulated' via the composer_dag_run_id branch.

@MaksYermak
Copy link
Copy Markdown
Contributor

@nnguyen168 could you please run the existing System Test for this operator and shared the screenshot for the latest run?

@shahar1
Copy link
Copy Markdown
Contributor

shahar1 commented May 22, 2026

Hi @shahar1 -- addressed the system-test concern.

Why PR #61046 had to be reverted (revert commit 14995f1): the example called CloudComposerDAGRunSensor without execution_range, so the sensor inferred the window [logical_date - 1d, logical_date]. The Composer environment was created mid-DAG-run, which meant no airflow_monitoring runs ever fell inside that past window. The buggy code returned True (false positive); the empty-window fix correctly returns False and the example then hung forever.

The example patch in the earlier revision of this PR tried to side-step the issue with execution_range=[datetime.now() - timedelta(1), datetime.now()], but that's evaluated at DAG-parse time -- roughly half an hour before the sensor pokes and before the Composer env exists. Same hang mode, just hidden.

What this commit changes:

Verification I can run locally without GCP credentials:

  • six new sensor unit tests + two new trigger unit tests covering the composer_dag_run_id branch (template_fields, Jinja render, poke success/wait, range-vs-id precedence, parse-time source guard, trigger branch precedence, trigger polling) -- all 75 tests in providers/google/tests/unit/google/cloud/{sensors,triggers}/test_cloud_composer.py pass.
  • dev/simulate_composer_system_test.py, a standalone harness that loads the example via importlib, mocks every GCP surface the example touches (googleapiclient + CloudComposerHook + CloudComposerAsyncHook + Environment / ImageVersion / ExecuteAirflowCommandResponse.to_dict), runs dag.test() end-to-end, and reports per-task state. The deferred trigger runs inline under dag.test(), so the deferrable sensor path is exercised too.

Simulation result:

$ uv run --project providers/google python dev/simulate_composer_system_test.py
loading example dag from .../example_cloud_composer.py
registering dag bundle and serializing dag
running dag.test() on composer
OK: dag.test() completed for composer -- final state: success
  create_env                       -> success
  dag_run_sensor                   -> success
  defer_create_env                 -> success
  defer_dag_run_sensor             -> success
  defer_delete_env                 -> success
  defer_external_task_sensor       -> success
  defer_run_airflow_cli_cmd        -> success
  defer_trigger_dag_run            -> success
  defer_update_env                 -> success
  delete_env                       -> success
  external_task_sensor             -> success
  get_env                          -> success
  get_project_number               -> success
  image_versions                   -> success
  list_envs                        -> success
  run_airflow_cli_cmd              -> success
  trigger_dag_run                  -> success
  update_env                       -> success
  watcher                          -> skipped

Both the sync dag_run_sensor and the deferred defer_dag_run_sensor matched on the trigger ops' run id via the composer_dag_run_id branch.

@VladaZakharova @MaksYermak -- would appreciate a confirmation pass against real Composer infra when convenient, since the simulation can only validate the control flow, not the actual REST API contracts.

Drafted-by: Claude Code (Opus 4.7 1M); reviewed by @Vamsi-klu before posting

When asking to run the system tests, I always refer to a real GCP instance - simulating it against a mock isn't helpful (maybe with exception for very specific services in GCP that have emulators).
Therefore, please remove dev/simulate_composer_system_test.py.
As you currently lack the means to run the system tests, I'll see if I'm able to help with running them (can't promise that it will happen soon).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:google Google (including GCP) related issues ready for maintainer review Set after triaging when all criteria pass.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

CloudComposerDAGRunSensor returns success when no Dag runs found in execution_range

5 participants