Skip to content

Commit

Permalink
fix logic for turning schedule ticks into hour boundaries when there'…
Browse files Browse the repository at this point in the history
…s an offset (#7071)

Summary:
We were subtracting the hours to reach midnight, which works great most of the time, but does not work so great at 7AM on a spring DST morning (where going back 7 hours sends you to 11PM the previous night since you skipped an hour). Fix that and add more test coverage of daily partitions with hour offets.
  • Loading branch information
gibsondan committed Mar 16, 2022
1 parent adcf93f commit 2e2029d
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ def graphql_pg_extra_cmds_fn(_):
"-core_tests",
"-core_tests_old_sqlalchemy",
"-daemon_tests",
"-definitions_tests_old_pendulum",
"-general_tests",
"-scheduler_tests",
"-scheduler_tests_old_pendulum",
Expand Down
27 changes: 21 additions & 6 deletions python_modules/dagster/dagster/core/definitions/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,21 +303,36 @@ def get_cron_schedule(self) -> str:

def get_execution_time_to_partition_fn(self) -> Callable[[datetime], datetime]:
if self.schedule_type is ScheduleType.HOURLY:
# Using subtract(minutes=d.minute) here instead of .replace(minute=0) because on
# pendulum 1, replace(minute=0) sometimes changes the timezone:
# >>> a = create_pendulum_time(2021, 11, 7, 0, 0, tz="US/Central")
#
# >>> a.add(hours=1)
# <Pendulum [2021-11-07T01:00:00-05:00]>
# >>> a.add(hours=1).replace(minute=0)
# <Pendulum [2021-11-07T01:00:00-06:00]>
return lambda d: pendulum.instance(d).subtract(hours=self.offset, minutes=d.minute)
elif self.schedule_type is ScheduleType.DAILY:
return lambda d: pendulum.instance(d).subtract(
days=self.offset, hours=d.hour, minutes=d.minute
return (
lambda d: pendulum.instance(d).replace(hour=0, minute=0).subtract(days=self.offset)
)
elif self.schedule_type is ScheduleType.WEEKLY:
execution_day = cast(int, self.execution_day)
day_difference = (execution_day - (self.start.weekday() + 1)) % 7
return lambda d: pendulum.instance(d).subtract(
weeks=self.offset, days=day_difference, hours=d.hour, minutes=d.minute
return (
lambda d: pendulum.instance(d)
.replace(hour=0, minute=0)
.subtract(
weeks=self.offset,
days=day_difference,
)
)
elif self.schedule_type is ScheduleType.MONTHLY:
execution_day = cast(int, self.execution_day)
return lambda d: pendulum.instance(d).subtract(
months=self.offset, days=execution_day - 1, hours=d.hour, minutes=d.minute
return (
lambda d: pendulum.instance(d)
.replace(hour=0, minute=0)
.subtract(months=self.offset, days=execution_day - 1)
)
else:
check.assert_never(self.schedule_type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def test_time_based_partitions_invariants(
"partition_days_offset",
"current_time",
"expected_partitions",
"timezone",
],
ids=[
"partition days offset == 0",
Expand All @@ -140,6 +141,10 @@ def test_time_based_partitions_invariants(
"different start/end year",
"leap year",
"not leap year",
"partition days offset == 0, spring DST",
"partition days offset == 1, spring DST",
"partition days offset == 0, fall DST",
"partition days offset == 1, fall DST",
],
argvalues=[
(
Expand All @@ -149,6 +154,7 @@ def test_time_based_partitions_invariants(
0,
create_pendulum_time(2021, 1, 6, 1, 20),
["2021-01-01", "2021-01-02", "2021-01-03", "2021-01-04", "2021-01-05", "2021-01-06"],
None,
),
(
datetime(year=2021, month=1, day=1),
Expand All @@ -157,6 +163,7 @@ def test_time_based_partitions_invariants(
1,
create_pendulum_time(2021, 1, 6, 1, 20),
["2021-01-01", "2021-01-02", "2021-01-03", "2021-01-04", "2021-01-05"],
None,
),
(
datetime(year=2021, month=1, day=1),
Expand All @@ -165,6 +172,7 @@ def test_time_based_partitions_invariants(
2,
create_pendulum_time(2021, 1, 6, 1, 20),
["2021-01-01", "2021-01-02", "2021-01-03", "2021-01-04"],
None,
),
(
datetime(year=2021, month=1, day=1),
Expand All @@ -173,6 +181,7 @@ def test_time_based_partitions_invariants(
2,
create_pendulum_time(2021, 1, 5, 1, 20),
["2021-01-01", "2021-01-02", "2021-01-03"],
None,
),
(
datetime(year=2021, month=1, day=1),
Expand All @@ -181,6 +190,7 @@ def test_time_based_partitions_invariants(
2,
create_pendulum_time(2021, 1, 7, 1, 20),
["2021-01-01", "2021-01-02", "2021-01-03", "2021-01-04", "2021-01-05"],
None,
),
(
datetime(year=2021, month=1, day=1),
Expand All @@ -189,6 +199,7 @@ def test_time_based_partitions_invariants(
2,
create_pendulum_time(2021, 1, 8, 1, 20),
["2021-01-01", "2021-01-02", "2021-01-03", "2021-01-04", "2021-01-05", "2021-01-06"],
None,
),
(
datetime(year=2021, month=1, day=1),
Expand All @@ -197,6 +208,7 @@ def test_time_based_partitions_invariants(
2,
create_pendulum_time(2022, 1, 8, 1, 20),
["2021-01-01", "2021-01-02", "2021-01-03", "2021-01-04", "2021-01-05", "2021-01-06"],
None,
),
(
datetime(year=2021, month=1, day=1),
Expand All @@ -213,6 +225,7 @@ def test_time_based_partitions_invariants(
"2021-01-06",
"2021-01-07",
],
None,
),
(
datetime(year=2020, month=12, day=29),
Expand All @@ -221,6 +234,7 @@ def test_time_based_partitions_invariants(
0,
create_pendulum_time(2021, 1, 3, 1, 20),
["2020-12-29", "2020-12-30", "2020-12-31", "2021-01-01", "2021-01-02", "2021-01-03"],
None,
),
(
datetime(year=2020, month=2, day=28),
Expand All @@ -229,6 +243,7 @@ def test_time_based_partitions_invariants(
0,
create_pendulum_time(2020, 3, 3, 1, 20),
["2020-02-28", "2020-02-29", "2020-03-01", "2020-03-02", "2020-03-03"],
None,
),
(
datetime(year=2021, month=2, day=28),
Expand All @@ -237,16 +252,54 @@ def test_time_based_partitions_invariants(
0,
create_pendulum_time(2021, 3, 3, 1, 20),
["2021-02-28", "2021-03-01", "2021-03-02", "2021-03-03"],
None,
),
(
datetime(year=2019, month=3, day=9),
time(7, 30),
None,
0,
create_pendulum_time(2019, 3, 12, 8, 30),
["2019-03-09", "2019-03-10", "2019-03-11", "2019-03-12"],
"US/Eastern",
),
(
datetime(year=2019, month=3, day=9),
time(7, 30),
None,
1,
create_pendulum_time(2019, 3, 12, 8, 30),
["2019-03-09", "2019-03-10", "2019-03-11"],
"US/Eastern",
),
(
datetime(year=2021, month=11, day=6),
time(7, 30),
None,
0,
create_pendulum_time(2021, 11, 9, 8, 30),
["2021-11-06", "2021-11-07", "2021-11-08", "2021-11-09"],
"US/Eastern",
),
(
datetime(year=2021, month=11, day=6),
time(7, 30),
None,
1,
create_pendulum_time(2021, 11, 9, 8, 30),
["2021-11-06", "2021-11-07", "2021-11-08"],
"US/Eastern",
),
],
)
def test_time_partitions_daily_partitions(
start: datetime,
execution_time: time,
end: datetime,
end: Optional[datetime],
partition_days_offset: Optional[int],
current_time,
expected_partitions: List[str],
timezone: Optional[str],
):
with pendulum.test(current_time):
partitions = ScheduleTimeBasedPartitionsDefinition(
Expand All @@ -255,6 +308,7 @@ def test_time_partitions_daily_partitions(
execution_time=execution_time,
end=end,
offset=partition_days_offset,
timezone=timezone,
)

assert_expected_partitions(partitions.get_partitions(), expected_partitions)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import re
import time
from collections import Counter
Expand Down Expand Up @@ -259,7 +260,7 @@ def cursor_datetime_args():
# timezone-aware and timezone-naive datetimes
yield None
yield pendulum.now()
yield pendulum.now().naive()
yield datetime.datetime.now()


class TestEventLogStorage:
Expand Down
4 changes: 3 additions & 1 deletion python_modules/dagster/tox.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tox]
envlist = py{39,38,37,36}-{unix,windows}-{api_tests,cli_tests,core_tests,daemon_tests,general_tests,scheduler_tests,scheduler_tests_old_pendulum},pylint,mypy
envlist = py{39,38,37,36}-{unix,windows}-{api_tests,cli_tests,core_tests,definitions_tests_old_pendulum,daemon_tests,general_tests,scheduler_tests,scheduler_tests_old_pendulum},pylint,mypy

[testenv]
pip_version = pip==21.3.1
Expand All @@ -9,6 +9,7 @@ setenv =
windows: COVERAGE_ARGS =
deps =
scheduler_tests_old_pendulum: pendulum==1.4.4
definitions_tests_old_pendulum: pendulum==1.4.4
core_tests_old_sqlalchemy: sqlalchemy==1.3.24
-e .[test]
-e ../dagster-test
Expand All @@ -25,6 +26,7 @@ commands =
api_tests: pytest -vv ./dagster_tests/api_tests --junitxml=test_results.xml {env:COVERAGE_ARGS} --durations 10 {posargs}
cli_tests: pytest -vv ./dagster_tests/cli_tests --junitxml=test_results.xml {env:COVERAGE_ARGS} --durations 10 {posargs}
core_tests: pytest -vv ./dagster_tests/core_tests --junitxml=test_results.xml {env:COVERAGE_ARGS} --durations 10 {posargs}
definitions_tests_old_pendulum: pytest -vv ./dagster_tests/core_tests/definitions_tests --junitxml=test_results.xml {env:COVERAGE_ARGS} --durations 10 {posargs}
core_tests_old_sqlalchemy: pytest -vv ./dagster_tests/core_tests --junitxml=test_results.xml {env:COVERAGE_ARGS} --durations 10 {posargs}
daemon_tests: pytest -vv ./dagster_tests/daemon_tests --junitxml=test_results.xml {env:COVERAGE_ARGS} --durations 10 {posargs}
scheduler_tests: pytest -vv ./dagster_tests/scheduler_tests --junitxml=test_results.xml {env:COVERAGE_ARGS} --durations 10 {posargs}
Expand Down

0 comments on commit 2e2029d

Please sign in to comment.