-
Notifications
You must be signed in to change notification settings - Fork 17.1k
Fix airflow dags clear clearing the wrong day for non-UTC partitioned timetables
#67717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -126,7 +126,15 @@ def dag_delete(args) -> None: | |
| @providers_configuration_loaded | ||
| @provide_session | ||
| def dag_clear(args, *, session: Session = NEW_SESSION) -> None: | ||
| """Clear Dag runs selected by run_id, partition_key, or a partition_date window.""" | ||
| """ | ||
| Clear Dag runs selected by run_id, partition_key, or a partition_date window. | ||
|
|
||
| When a partition_date window is given, both bounds are **day-granular** and | ||
| anchored in the timetable's timezone for tz-aware partitioned timetables. | ||
| --partition-date-start is the inclusive start local calendar day; | ||
| --partition-date-end is the inclusive end local calendar day (any | ||
| time-of-day component in either value is ignored). | ||
| """ | ||
| has_range = args.partition_date_start is not None or args.partition_date_end is not None | ||
| selectors_used = sum([args.run_id is not None, args.partition_key is not None, has_range]) | ||
| if selectors_used == 0: | ||
|
|
@@ -157,10 +165,52 @@ def dag_clear(args, *, session: Session = NEW_SESSION) -> None: | |
| query = query.where(DagRun.partition_key == args.partition_key) | ||
| else: | ||
| query = query.where(DagRun.partition_date.is_not(None)) | ||
| if args.partition_date_start is not None: | ||
| query = query.where(DagRun.partition_date >= args.partition_date_start) | ||
| if args.partition_date_end is not None: | ||
| query = query.where(DagRun.partition_date <= args.partition_date_end) | ||
| tt_tz = getattr(dag.timetable, "timezone", None) if dag.timetable.partitioned else None | ||
| if tt_tz is not None: | ||
| # Partitioned runs are stored as local-midnight UTC instants; compare at day | ||
| # granularity in the timetable's timezone rather than at the raw UTC instant. | ||
| if args.partition_date_start is not None: | ||
| start_label = args.partition_date_start.date() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| lower_utc = timezone.convert_to_utc( | ||
| timezone.make_aware( | ||
| datetime.datetime(start_label.year, start_label.month, start_label.day), | ||
| tt_tz, | ||
| ) | ||
| ) | ||
| query = query.where(DagRun.partition_date >= lower_utc) | ||
| if args.partition_date_end is not None: | ||
| end_label = args.partition_date_end.date() | ||
| # Half-open upper bound: include all of the end local calendar day. | ||
| next_day = datetime.date(end_label.year, end_label.month, end_label.day) + datetime.timedelta( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| days=1 | ||
| ) | ||
| upper_utc = timezone.convert_to_utc( | ||
| timezone.make_aware( | ||
| datetime.datetime(next_day.year, next_day.month, next_day.day), | ||
| tt_tz, | ||
| ) | ||
| ) | ||
| query = query.where(DagRun.partition_date < upper_utc) | ||
| else: | ||
| # No timetable timezone: partition_date values are midnight-anchored UTC dates, | ||
| # so time-of-day on the CLI flags is not meaningful — truncate to calendar day. | ||
| if args.partition_date_start is not None: | ||
| start_day = args.partition_date_start.date() | ||
| query = query.where( | ||
| DagRun.partition_date | ||
| >= datetime.datetime( | ||
| start_day.year, start_day.month, start_day.day, tzinfo=datetime.timezone.utc | ||
| ) | ||
| ) | ||
| if args.partition_date_end is not None: | ||
| end_day = args.partition_date_end.date() | ||
| next_day = end_day + datetime.timedelta(days=1) | ||
| query = query.where( | ||
| DagRun.partition_date | ||
| < datetime.datetime( | ||
| next_day.year, next_day.month, next_day.day, tzinfo=datetime.timezone.utc | ||
| ) | ||
| ) | ||
| query = query.order_by(DagRun.partition_date, DagRun.run_id) | ||
|
|
||
| runs = list(session.execute(query).all()) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tt_tzis resolved by probing for a.timezoneattribute, which onlyCronMixin-based timetables have (this PR adds the property).PartitionedAssetTimetable(timetables/simple.py:267) is alsopartitioned = Truebut has no.timezone, so it silently takes the no-tz branch. That's correct today if asset-partition dates are genuinely UTC-anchored, but the dispatch is duck-typed: any future tz-aware partitioned timetable that doesn't expose.timezonewill silently fall back to the UTC branch and reintroduce the exact off-by-one this PR fixes, with no error to flag it. Worth either putting the tz accessor on the partitioned-timetable contract so it's explicit which timetables are tz-aware, or branching on a known type. Minor, related: the two day-bound blocks are nearly identical across the tz and no-tz paths and could share a_day_bounds(label, tz)helper to keep them from drifting.