Skip to content

Commit

Permalink
Time window partitions with custom minute, hour, day offsets (#7125)
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Mar 28, 2022
1 parent e642465 commit 8378c0c
Show file tree
Hide file tree
Showing 9 changed files with 392 additions and 64 deletions.
2 changes: 1 addition & 1 deletion docs/content/api/modules.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/content/api/searchindex.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/content/api/sections.json

Large diffs are not rendered by default.

24 changes: 21 additions & 3 deletions docs/content/concepts/partitions-schedules-sensors/partitions.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def do_stuff_partitioned():
process_data_for_date()
```

In addition to the <PyObject object="daily_partitioned_config" decorator /> decorator, Dagster also provides <PyObject object="monthly_partitioned_config" decorator />, <PyObject object="weekly_partitioned_config" decorator />, <PyObject object="hourly_partitioned_config" decorator />.
In addition to the <PyObject object="daily_partitioned_config" decorator /> decorator, Dagster also provides <PyObject object="monthly_partitioned_config" decorator />, <PyObject object="weekly_partitioned_config" decorator />, <PyObject object="hourly_partitioned_config" decorator />. See the API docs for each of these decorators for more information on how partitions are built based on different `start_date`, `minute_offset`, `hour_offset`, and `day_offset` inputs.

### Defining a Job with Static Partitions

Expand Down Expand Up @@ -183,12 +183,30 @@ Invoking a <PyObject object="PartitionedConfig" /> object will directly invoke t

If you want to check whether the generated run config is valid for the config of job, you can use the <PyObject object="validate_run_config" /> function.

```python file=/concepts/partitions_schedules_sensors/partitioned_config_test.py startafter=start endbefore=end
```python file=/concepts/partitions_schedules_sensors/partitioned_config_test.py startafter=start_partition_config endbefore=end_partition_config
from dagster import validate_run_config, daily_partitioned_config
from datetime import datetime


@daily_partitioned_config(start_date=datetime(2020, 1, 1))
def my_partitioned_config(start: datetime, _end: datetime):
return {
"ops": {
"process_data_for_date": {"config": {"date": start.strftime("%Y-%m-%d")}}
}
}


def test_my_partitioned_config():
# assert that the decorated function returns the expected output
run_config = my_partitioned_config(datetime(2020, 1, 3), datetime(2020, 1, 4))
assert run_config == {
"ops": {"process_data_for_date": {"config": {"date": "2020-01-03"}}}
}

# assert that the output of the decorated function is valid configuration for the
# do_stuff_partitioned job
assert validate_run_config(do_stuff_partitioned, run_config)
```

### Testing Partitioned Jobs
Expand All @@ -206,7 +224,7 @@ def test_do_stuff_partitioned():

In Dagit, you can view runs by partition in the Partitions tab of a Job page.

In the "Run Matrix", each column corresponds to one of the partitions in the job. Each row corresponds to one of the steps in the job.
In the "Run Matrix", each column corresponds to one of the partitions in the job. The time listed corresponds to the start time of the partition. Each row corresponds to one of the steps in the job.

<!-- This was generated from go/prod -->

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
)


# start
# start_partition_config
from dagster import validate_run_config, daily_partitioned_config
from datetime import datetime

Expand All @@ -32,4 +32,4 @@ def test_my_partitioned_config():
assert validate_run_config(do_stuff_partitioned, run_config)


# end
# end_partition_config
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
Partition,
PartitionSetDefinition,
PartitionedConfig,
ScheduleTimeBasedPartitionsDefinition,
ScheduleType,
get_cron_schedule,
)
from .run_request import SkipReason
from .schedule_definition import (
Expand Down Expand Up @@ -67,23 +67,19 @@ def build_schedule_from_partitioned_job(
)

if partitions_def.schedule_type == ScheduleType.MONTHLY:
execution_day = check.opt_int_param(day_of_month, "day_of_month", default=1)
default = partitions_def.day_offset or 1
execution_day = check.opt_int_param(day_of_month, "day_of_month", default=default)
elif partitions_def.schedule_type == ScheduleType.WEEKLY:
execution_day = check.opt_int_param(day_of_week, "day_of_week", default=0)
default = partitions_def.day_offset or 0
execution_day = check.opt_int_param(day_of_week, "day_of_week", default=default)
else:
execution_day = 0

schedule_partitions = ScheduleTimeBasedPartitionsDefinition(
schedule_type=partitions_def.schedule_type,
start=partitions_def.start,
execution_time=execution_time,
execution_day=execution_day,
offset=1,
)
cron_schedule = get_cron_schedule(partitions_def.schedule_type, execution_time, execution_day)

schedule_def = partition_set.create_schedule_definition(
schedule_name=check.opt_str_param(name, "name", f"{job.name}_schedule"),
cron_schedule=schedule_partitions.get_cron_schedule(),
cron_schedule=cron_schedule,
partition_selector=latest_window_partition_selector,
execution_timezone=partitions_def.timezone,
description=description,
Expand Down

1 comment on commit 8378c0c

@vercel
Copy link

@vercel vercel bot commented on 8378c0c Mar 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.