Skip to content

Commit

Permalink
Partition docs updates (#6814)
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Feb 25, 2022
1 parent ef3ad62 commit 6098bf0
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 3 deletions.
2 changes: 1 addition & 1 deletion docs/content/api/modules.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/content/api/sections.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,45 @@ def continent_job():

It's common that, when you have a partitioned job, you want to run it on a schedule. For example, if your job has a partition for each date, you likely want to run that job every day, on the partition for that day.

The <PyObject object="build_schedule_from_partitioned_job"/> function allows you to construct a schedule from a partitioned job. The [Schedules concept page](/concepts/partitions-schedules-sensors/schedules#a-schedule-from-a-partitioned-job) describes how to use it.
The <PyObject object="build_schedule_from_partitioned_job"/> function allows you to construct a schedule from a date partitioned job. It creates a schedule with an interval that matches the spacing of your partition. If you wanted to create a schedule for `do_stuff_partitioned` job defined above, you could write:

```python file=/concepts/partitions_schedules_sensors/schedule_from_partitions.py startafter=start_marker endbefore=end_marker
from dagster import build_schedule_from_partitioned_job, job


@job(config=my_partitioned_config)
def do_stuff_partitioned():
...


do_stuff_partitioned_schedule = build_schedule_from_partitioned_job(
do_stuff_partitioned,
)
```

Schedules can also be made from static partitioned jobs. If you wanted to make a schedule for the `continent_job` above that runs each partition, you could write:

```python file=/concepts/partitions_schedules_sensors/schedule_from_partitions.py startafter=start_static_partition endbefore=end_static_partition
from dagster import schedule


@schedule(cron_schedule="0 0 * * *", job=continent_job)
def continent_schedule():
for c in CONTINENTS:
request = continent_job.run_request_for_partition(partition_key=c, run_key=c)
yield request
```

Or a schedule that will run a subselection of the partition

```python file=/concepts/partitions_schedules_sensors/schedule_from_partitions.py startafter=start_single_partition endbefore=end_single_partition
@schedule(cron_schedule="0 0 * * *", job=continent_job)
def antarctica_schedule():
request = continent_job.run_request_for_partition(partition_key="Antarctica", run_key=None)
yield request
```

The [Schedules concept page](/concepts/partitions-schedules-sensors/schedules#a-schedule-from-a-partitioned-job) describes how construct both kinds of schedules in more detail.

## Testing

Expand Down
51 changes: 51 additions & 0 deletions docs/content/concepts/partitions-schedules-sensors/schedules.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ If you don't need access to the context parameter, you can omit it from the deco

### A schedule from a partitioned job

#### Time partitioned job

When you have a [partitioned job](/concepts/partitions-schedules-sensors/partitions) that's partitioned by time, you can use the <PyObject object="build_schedule_from_partitioned_job"/> function to construct a schedule for it whose interval matches the spacing of partitions in your job.

For example, if you have a daily partitioned job that fills in a date partition of a table each time it runs, you likely want to run that job every day.
Expand All @@ -104,6 +106,55 @@ However, you can use the `end_offset` parameter of <PyObject object="daily_parti

You can use the `minute_of_hour`, `hour_of_day`, `day_of_week`, and `day_of_month` parameters of `build_schedule_from_partitioned_job` to control the timing of the schedule. For example, if you have a job that's partitioned by date, and you set `minute_of_hour` to `30` and `hour_of_day` to `1`, the schedule would submit the run for partition `2020-04-01` at 1:30 AM on `2020-04-02`.

#### Static partitioned job

You can also create a schedule for a static partition. The Partitioned Jobs concepts page also includes an [example of how to define a static partitioned job](/concepts/partitions-schedules-sensors/partitions#defining-a-job-with-static-partitions). To define a schedule for a static partitioned job, we will construct a schedule from scratch, rather than using a helper function like `build_schedule_from_partitioned_job` this will allow more flexibility in determining which partitions should be run by the schedule.

For example, if we have the continents static partitioned job from the Partitioned Jobs concept page

```python file=/concepts/partitions_schedules_sensors/static_partitioned_job.py
from dagster import job, op, static_partitioned_config

CONTINENTS = ["Africa", "Antarctica", "Asia", "Europe", "North America", "Oceania", "South America"]


@static_partitioned_config(partition_keys=CONTINENTS)
def continent_config(partition_key: str):
return {"ops": {"continent_op": {"config": {"continent_name": partition_key}}}}


@op(config_schema={"continent_name": str})
def continent_op(context):
context.log.info(context.op_config["continent_name"])


@job(config=continent_config)
def continent_job():
continent_op()
```

We can write a schedule that will run this partition

```python file=/concepts/partitions_schedules_sensors/schedule_from_partitions.py startafter=start_static_partition endbefore=end_static_partition
from dagster import schedule


@schedule(cron_schedule="0 0 * * *", job=continent_job)
def continent_schedule():
for c in CONTINENTS:
request = continent_job.run_request_for_partition(partition_key=c, run_key=c)
yield request
```

Or a schedule that will run a subselection of the partition

```python file=/concepts/partitions_schedules_sensors/schedule_from_partitions.py startafter=start_single_partition endbefore=end_single_partition
@schedule(cron_schedule="0 0 * * *", job=continent_job)
def antarctica_schedule():
request = continent_job.run_request_for_partition(partition_key="Antarctica", run_key=None)
yield request
```

### Timezones

You can customize the timezone in which your schedule executes by setting the `execution_timezone` parameter on your schedule to any [tz timezone](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones). Schedules with no timezone set run in UTC.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,29 @@ def do_stuff_partitioned():
)

# end_marker

from .static_partitioned_job import continent_job, CONTINENTS

# start_static_partition
from dagster import schedule


@schedule(cron_schedule="0 0 * * *", job=continent_job)
def continent_schedule():
for c in CONTINENTS:
request = continent_job.run_request_for_partition(partition_key=c, run_key=c)
yield request


# end_static_partition

# start_single_partition


@schedule(cron_schedule="0 0 * * *", job=continent_job)
def antarctica_schedule():
request = continent_job.run_request_for_partition(partition_key="Antarctica", run_key=None)
yield request


# end_single_partition

1 comment on commit 6098bf0

@vercel
Copy link

@vercel vercel bot commented on 6098bf0 Feb 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.