Skip to content

Commit

Permalink
enable vixie-style cron strings (#6810)
Browse files Browse the repository at this point in the history
* support vixie cronstrings (e.g. @daily) in dagster schedules

* mypy

* pin croniter to support keyword aliases

* unpin croniter

* adjusted comment
  • Loading branch information
prha committed Mar 4, 2022
1 parent 28fe827 commit 7fa5308
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ You define a schedule by constructing a <PyObject object="ScheduleDefinition" />

### A basic schedule

Here's a simple schedule that runs a job every day, at midnight. The `cron_schedule` accepts standard [cron expressions](https://en.wikipedia.org/wiki/Cron).
Here's a simple schedule that runs a job every day, at midnight. The `cron_schedule` accepts standard [cron expressions](https://en.wikipedia.org/wiki/Cron). It also accepts `"@hourly"`, `"@daily"`, `"@weekly"`, and `"@monthly"` if your `croniter` dependency's version is >= 1.0.12.

```python file=concepts/partitions_schedules_sensors/schedules/schedules.py startafter=start_basic_schedule endbefore=end_basic_schedule
@job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ def __init__(

if not is_valid_cron_string(self._cron_schedule):
raise DagsterInvalidDefinitionError(
f"Found invalid cron schedule '{self._cron_schedule}' for schedule '{name}''. "
"Dagster recognizes cron expressions consisting of 5 space-separated fields."
f"Found invalid cron schedule '{self._cron_schedule}' for schedule '{name}''. "
"Dagster recognizes standard cron expressions consisting of 5 fields."
)

if job is not None:
Expand Down
28 changes: 17 additions & 11 deletions python_modules/dagster/dagster/utils/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@


def is_valid_cron_string(cron_string: str) -> bool:
# dagster only recognizes standard cron strings that contains 5 parts
return croniter.is_valid(cron_string) and len(cron_string.split(" ")) == 5
if not croniter.is_valid(cron_string):
return False
expanded, _ = croniter.expand(cron_string)
# dagster only recognizes cron strings that resolve to 5 parts (e.g. not seconds resolution)
return len(expanded) == 5


def schedule_execution_time_iterator(
Expand All @@ -28,30 +31,33 @@ def schedule_execution_time_iterator(
# and matches the cron schedule
next_date = date_iter.get_prev(datetime.datetime)

cron_parts = cron_schedule.split(" ")

check.invariant(is_valid_cron_string(cron_schedule))

is_numeric = [part.isnumeric() for part in cron_parts]
cron_parts, _ = croniter.expand(cron_schedule)

is_numeric = [len(part) == 1 and part[0] != "*" for part in cron_parts]
is_wildcard = [len(part) == 1 and part[0] == "*" for part in cron_parts]

delta_fn = None
should_hour_change = False

# Special-case common intervals (hourly/daily/weekly/monthly) since croniter iteration can be
# much slower than adding a fixed interval
if cron_schedule.endswith(" * *") and all(is_numeric[0:3]): # monthly
if all(is_numeric[0:3]) and all(is_wildcard[3:]): # monthly
delta_fn = lambda d, num: d.add(months=num)
should_hour_change = False
elif (
all(is_numeric[0:2]) and is_numeric[4] and cron_parts[2] == "*" and cron_parts[3] == "*"
): # weekly
elif all(is_numeric[0:2]) and is_numeric[4] and all(is_wildcard[2:4]): # weekly
delta_fn = lambda d, num: d.add(weeks=num)
should_hour_change = False
elif all(is_numeric[0:2]) and cron_schedule.endswith(" * * *"): # daily
elif all(is_numeric[0:2]) and all(is_wildcard[2:]): # daily
delta_fn = lambda d, num: d.add(days=num)
should_hour_change = False
elif is_numeric[0] and cron_schedule.endswith(" * * * *"): # hourly
elif is_numeric[0] and all(is_wildcard[1:]): # hourly
delta_fn = lambda d, num: d.add(hours=num)
should_hour_change = True
else:
delta_fn = None
should_hour_change = False

if delta_fn:
# Use pendulums for intervals when possible
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ def bad_cron_string_two(context):

with pytest.raises(DagsterInvalidDefinitionError, match="invalid cron schedule"):

@schedule(cron_schedule="@daily", pipeline_name="foo_pipeline")
@schedule(cron_schedule="* * * * * *", pipeline_name="foo_pipeline")
def bad_cron_string_three(context):
return {}

Expand Down Expand Up @@ -995,3 +995,26 @@ def foo_schedule():
assert len(requests) == 1
assert requests[0].run_config == FOO_CONFIG
assert requests[0].tags.get("foo") == "FOO"


def test_vixie_cronstring_schedule():
context_without_time = build_schedule_context()
start_date = datetime(year=2019, month=1, day=1)

@op
def foo_op(context):
pass

@job
def foo_job():
foo_op()

@schedule(cron_schedule="@daily", job=foo_job)
def foo_schedule():
yield RunRequest(run_key=None, run_config={}, tags={"foo": "FOO"})

# evaluate tick
execution_data = foo_schedule.evaluate_tick(context_without_time)
assert execution_data.run_requests
assert len(execution_data.run_requests) == 1
assert execution_data.run_requests[0].tags.get("foo") == "FOO"
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import pytest
from dagster.check import CheckError
from dagster.seven.compat.pendulum import create_pendulum_time
from dagster.utils.schedules import schedule_execution_time_iterator

Expand All @@ -23,3 +25,63 @@ def test_cron_schedule_advances_past_dst():
year=2021, month=10, day=3, hour=4, tz="Australia/Sydney"
).timestamp()
)


def test_vixie_cronstring_schedule():
start_time = create_pendulum_time(
year=2022, month=2, day=21, hour=1, minute=30, second=1, tz="US/Pacific"
)

time_iter = schedule_execution_time_iterator(start_time.timestamp(), "@hourly", "US/Pacific")
for _i in range(6):
# 2:00, 3:00, 4:00, 5:00, 6:00, 7:00
next_time = next(time_iter)
assert (
next_time.timestamp()
== create_pendulum_time(year=2022, month=2, day=21, hour=7, tz="US/Pacific").timestamp()
)

time_iter = schedule_execution_time_iterator(start_time.timestamp(), "@daily", "US/Pacific")
for _i in range(6):
# 2/22, 2/23, 2/24, 2/25, 2/26, 2/27
next_time = next(time_iter)
assert (
next_time.timestamp()
== create_pendulum_time(year=2022, month=2, day=27, tz="US/Pacific").timestamp()
)

time_iter = schedule_execution_time_iterator(start_time.timestamp(), "@weekly", "US/Pacific")
for _i in range(6):
# 2/27, 3/6, 3/13, 3/20, 3/27, 4/3
next_time = next(time_iter)
assert (
next_time.timestamp()
== create_pendulum_time(year=2022, month=4, day=3, tz="US/Pacific").timestamp()
)

time_iter = schedule_execution_time_iterator(start_time.timestamp(), "@monthly", "US/Pacific")
for _i in range(6):
# 3/1, 4/1, 5/1, 6/1, 7/1, 8/1
next_time = next(time_iter)
assert (
next_time.timestamp()
== create_pendulum_time(year=2022, month=8, day=1, tz="US/Pacific").timestamp()
)

time_iter = schedule_execution_time_iterator(start_time.timestamp(), "@yearly", "US/Pacific")
for _i in range(6):
# 1/1/2023, 1/1/2024, 1/1/2025, 1/1/2026, 1/1/2027, 1/1/2028
next_time = next(time_iter)
assert (
next_time.timestamp()
== create_pendulum_time(year=2028, month=1, day=1, tz="US/Pacific").timestamp()
)


def test_invalid_cron_string():
start_time = create_pendulum_time(
year=2022, month=2, day=21, hour=1, minute=30, second=1, tz="US/Pacific"
)

with pytest.raises(CheckError):
next(schedule_execution_time_iterator(start_time.timestamp(), "* * * * * *", "US/Pacific"))

1 comment on commit 7fa5308

@vercel
Copy link

@vercel vercel bot commented on 7fa5308 Mar 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.