-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Fix/stale dag detection safeguards #60013
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix/stale dag detection safeguards #60013
Conversation
The help text for --start-date and --end-date arguments only documented the YYYY-MM-DD format, but the actual implementation uses pendulum.parse() which accepts a much wider variety of formats. This commit updates the help text to accurately document the commonly used formats: - YYYY-MM-DD (date only) - YYYY-MM-DDTHH:MM:SS (datetime) - YYYY-MM-DDTHH:MM:SSHH:MM (datetime with timezone, ISO 8601) The help text also references pendulum.parse() to indicate that additional formats are supported, improving clarity for users. Fixes: Incomplete documentation of date format options
This change addresses an issue where the scheduler crashes when executors (such as AWS Lambda Executor) generate external executor IDs exceeding 250 characters. Long dag_id, task_id, and run_id combinations can easily exceed this limit, causing database constraint violations. Changes: - Created migration 0094_3_2_0_increase_external_executor_id_length.py to alter the external_executor_id column in both task_instance and task_instance_history tables from VARCHAR(250) to VARCHAR(1000) - Updated TaskInstance model to use StringID(length=1000) for external_executor_id column - Updated TaskInstanceHistory model to use StringID(length=1000) for external_executor_id column This fix allows executors with longer identifiers to work properly without causing StringDataRightTruncation errors. Fixes: #<issue_number>
Prevent unnecessary cache invalidation in useRefreshOnNewDagRuns hook when the component first mounts by adding an early return when the previousDagRunIdRef is undefined. On initial page load, when latestDagRunId is fetched for the first time, it would differ from the undefined previousDagRunIdRef, causing cache invalidation and duplicate HTTP requests to endpoints like /ui/grid/runs, /ui/grid/structure, etc. The fix sets previousDagRunIdRef without invalidating queries on the first render, ensuring cache invalidation only occurs when a genuinely new DAG run appears (not on initial load). This reduces unnecessary network traffic and improves page load performance while preserving the intended refresh behavior when new DAG runs complete.
Handle NotFound exception when deleting already-deleted Dataproc clusters to prevent failures in ephemeral cluster cleanup patterns. When using DataprocCreateClusterOperator with delete_on_error=True (default), failed cluster creation automatically deletes the cluster. Downstream DataprocDeleteClusterOperator with TriggerRule.ALL_DONE would then fail with NotFound error when attempting to delete the already-deleted cluster. This change makes the operator idempotent by: - Catching NotFound exceptions in both synchronous and deferrable modes - Logging informational message when cluster is already deleted - Completing successfully instead of failing This enables clean ephemeral cluster patterns: Create Cluster -> Run Jobs -> Delete Cluster (ALL_DONE trigger rule) The operator now succeeds whether the cluster exists or not, preventing cleanup task failures from masking actual upstream failures in monitoring.
The NeedsReviewButton component was continuously polling the
/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/hitlDetails endpoint
even when there were no pending or deferred task instances. This
resulted in unnecessary API calls and increased server load.
This commit optimizes the polling behavior by:
1. First checking if there are any deferred task instances using
the lighter-weight getTaskInstances endpoint
2. Only calling the hitlDetails endpoint when deferred tasks exist
3. Using the 'enabled' option in React Query to conditionally enable
the hitlDetails query
This change significantly reduces API calls when DAGs have no
pending runs, while maintaining the same functionality when
pending actions are present.
Fixes: Issue where hitlDetails API was continuously polled even
when no pending runs existed
The scheduler was using potentially stale DagRun data when calculating next_dagrun fields, which could cause next_dagrun_create_after to be set back in time in distributed systems. Problem: - calculate_dagrun_date_fields() accepted last_automated_dag_run parameter - Parameter name implied it was always the latest run, but wasn't always true - Scheduler passed whatever run it was processing at the time - In distributed systems, newer runs could exist by the time calculation happened - This led to next_dagrun being calculated from outdated data Solution: - Rename parameter to last_automated_data_interval to clarify it's for calculation - Update docstring to make expectations explicit - Add _get_latest_automated_dagrun_data_interval() helper in scheduler - Always query for actual latest automated run before calculating next_dagrun - Ensures next_dagrun fields are always calculated from current data This prevents race conditions where: 1. Scheduler processes run A 2. Run B is created by another process 3. Scheduler calculates next_dagrun based on A (now stale) 4. next_dagrun is set incorrectly, potentially back in time Changes: - Renamed last_automated_dag_run to last_automated_data_interval in DagModel.calculate_dagrun_date_fields() - Added helper method to query actual latest automated run - Updated all scheduler call sites to query before calculating - Improved documentation to prevent future misuse
…cess, failed); clarify OpenAPI docs; add tests for delete validation
- Refactor 0015_2_9_0_update_trigger_kwargs_type.py to avoid importing airflow.models.Trigger - Remove ORM-based operations that caused issues with future schema changes - Add pre-commit hook to prevent ORM model imports in future migrations - Prevent issues with downgrade when new columns are added to Trigger model This fixes the issue where adding new columns to the Trigger model would break downgrades to 2.9.0 due to ORM references in the migration script.
This commit adds safeguards to prevent DAGs from being incorrectly marked as stale due to various edge cases: 1. Empty file list protection: When no DAG files are found during bundle refresh (e.g., due to transient NFS/volume mount issues), skip the deactivation check instead of marking all DAGs stale. 2. Null relative_fileloc handling: Skip DAGs with None relative_fileloc during both deactivate_deleted_dags and deactivate_stale_dags checks. These DAGs will be properly updated when their files are parsed. 3. Null last_parsed_time handling: Skip DAGs with None last_parsed_time during stale detection to avoid TypeError and ensure they are properly updated when parsed. These issues could cause DAGs to randomly disappear in Kubernetes deployments or other environments where transient filesystem issues may occur.
|
Thanks in advance for taking a look at this PR. I’ll actively follow up on feedback and make any required changes promptly—happy to adjust the approach or implementation as needed. |
kaxil
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lots of things needs improvements.
Please make an active effort to review your own code, test it, write a PR description with good formatting

Fix DAGs randomly disappearing due to stale DAG detection issues
Summary
Fixes multiple edge cases in stale DAG detection that cause DAGs to be incorrectly marked as stale/inactive, particularly in Kubernetes deployments with transient filesystem issues.
Three Root Causes Fixed:
Empty file list during bundle refresh
When bundle directories are temporarily unavailable (NFS delays, pod restarts, volume mount issues), list_py_file_paths() returns empty list → ALL DAGs marked stale
DAGs with None relative_fileloc
None not in rel_filelocs = True → DAG incorrectly marked stale
DAGs with None last_parsed_time
None + timedelta(...) → TypeError (and potential stale marking)
Changes:
16 lines in manager.py - Added guards for empty file list, None checks
3 lines in dag.py - Added None check
164 lines in tests - 4 comprehensive unit tests
Impact:
✅ Prevents false positives
✅ Legitimate stale DAGs still detected correctly
✅ No config changes needed
✅ Works with existing STALE_DAG_THRESHOLD
Branch: fix/stale-dag-detection-safeguards
Closes:58717