diff --git a/docs/content/concepts/automation/schedules/examples.mdx b/docs/content/concepts/automation/schedules/examples.mdx index d32e64af51aa..8e18e679a64d 100644 --- a/docs/content/concepts/automation/schedules/examples.mdx +++ b/docs/content/concepts/automation/schedules/examples.mdx @@ -529,15 +529,17 @@ from dagster import ( asset, build_schedule_from_partitioned_job, define_asset_job, - HourlyPartitionsDefinition, + DailyPartitionsDefinition, ) +daily_partition = DailyPartitionsDefinition(start_date="2024-05-20") -@asset(partitions_def=HourlyPartitionsDefinition(start_date="2020-01-01-00:00")) -def hourly_asset(): ... +@asset(partitions_def=daily_partition) +def daily_asset(): ... -partitioned_asset_job = define_asset_job("partitioned_job", selection=[hourly_asset]) + +partitioned_asset_job = define_asset_job("partitioned_job", selection=[daily_asset]) asset_partitioned_schedule = build_schedule_from_partitioned_job( @@ -598,7 +600,7 @@ asset_partitioned_schedule = build_schedule_from_partitioned_job(
  • - +
  • @@ -617,12 +619,12 @@ This example demonstrates how to construct a schedule for a time-partitioned op from dagster import build_schedule_from_partitioned_job, job -@job(config=my_partitioned_config) -def do_stuff_partitioned(): ... +@job(config=partitioned_config) +def partitioned_op_job(): ... -do_stuff_partitioned_schedule = build_schedule_from_partitioned_job( - do_stuff_partitioned, +partitioned_op_schedule = build_schedule_from_partitioned_job( + partitioned_op_job, ) ``` diff --git a/docs/content/concepts/automation/schedules/partitioned-schedules.mdx b/docs/content/concepts/automation/schedules/partitioned-schedules.mdx new file mode 100644 index 000000000000..6ed402c47c94 --- /dev/null +++ b/docs/content/concepts/automation/schedules/partitioned-schedules.mdx @@ -0,0 +1,264 @@ +--- +title: "Constructing schedules from partitioned assets and jobs | Dagster Docs" +description: "Learn to construct schedules for your partitioned jobs." +--- + +# Constructing schedules from partitioned jobs + +In this guide, we'll walk you through how to construct schedules from partitioned [assets](/concepts/assets/software-defined-assets) and jobs. By the end, you'll be able to: + +- Construct a schedule for a time-partitioned job +- Customize a partitioned job's starting time +- Customize the most recent partition in a set +- Construct a schedule for a statically-partitioned job + +--- + +## Prerequisites + +To follow this guide, you need to be familiar with: + +- [Schedules](/concepts/partitions-schedules-sensors/schedules) +- [Partitions](/concepts/partitions-schedules-sensors/partitions) +- [Asset definitions](/concepts/assets/software-defined-assets) +- [Asset jobs](/concepts/assets/asset-jobs) and [op jobs](/concepts/ops-jobs-graphs/op-jobs) + +--- + +## Working with time-based partitions + +For jobs partitioned by time, you can use the to construct a schedule for the job. The schedule's interval will match the spacing of the partitions in the job. For example, if you have a daily partitioned job that fills in a date partition of a table each time it runs, you likely want to run that job every day. + +Refer to the following tabs for examples of asset and op-based jobs using to construct schedules: + + + + +#### Asset jobs + +Asset jobs are defined using . In this example, we created an asset job named `partitioned_job` and then constructed `asset_partitioned_schedule` by using : + +```python file=/concepts/partitions_schedules_sensors/schedule_from_partitions.py startafter=start_partitioned_asset_schedule endbefore=end_partitioned_asset_schedule +from dagster import ( + asset, + build_schedule_from_partitioned_job, + define_asset_job, + DailyPartitionsDefinition, +) + +daily_partition = DailyPartitionsDefinition(start_date="2024-05-20") + + +@asset(partitions_def=daily_partition) +def daily_asset(): ... + + +partitioned_asset_job = define_asset_job("partitioned_job", selection=[daily_asset]) + + +asset_partitioned_schedule = build_schedule_from_partitioned_job( + partitioned_asset_job, +) +``` + + + + +#### Op jobs + +Op jobs are defined using the . In this example, we created a partitioned job named `partitioned_op_job` and then constructed `partitioned_op_schedule` using : + +```python file=/concepts/partitions_schedules_sensors/schedule_from_partitions.py startafter=start_marker endbefore=end_marker +from dagster import build_schedule_from_partitioned_job, job + + +@job(config=partitioned_config) +def partitioned_op_job(): ... + + +partitioned_op_schedule = build_schedule_from_partitioned_job( + partitioned_op_job, +) +``` + + + + +### Customizing schedule timing + +The `minute_of_hour`, `hour_of_day`, `day_of_week`, and `day_of_month` parameters of `build_schedule_from_partitioned_job` can be used to control the timing of the schedule. + +Consider the following job: + +```python file=/concepts/partitions_schedules_sensors/schedule_from_partitions.py startafter=start_partitioned_schedule_with_offset endbefore=end_partitioned_schedule_with_offset +from dagster import build_schedule_from_partitioned_job + +asset_partitioned_schedule = build_schedule_from_partitioned_job( + partitioned_asset_job, hour_of_day=1, minute_of_hour=30 +) +``` + +On May 20, 2024, the schedule will evaluate at 1:30 AM UTC and then start a run for the partition key of the previous day, `2024-05-19`. + +### Customizing the ending partition in a set + + + Heads up! The examples in this section use daily partitions, + but the same logic also applies to other time-based partitions, such as + hourly, weekly, and monthly partitions. + + +Each schedule tick of a partitioned job targets the latest partition in the partition set that exists as of the tick time. For example, consider a schedule that runs a daily-partitioned job. When the schedule runs on `2024-05-20`, it will target the most recent partition, which will correspond to the previous day: `2024-05-19`. + +| If a job runs on this date... | It will target this partition | +| ----------------------------- | ----------------------------- | +| 2024-05-20 | 2024-05-19 | +| 2024-05-21 | 2024-05-20 | +| 2024-05-22 | 2024-05-21 | + +This occurs because each partition is a **time window**. A time window is a set period of time with a start and an end time. The partition's key is the start of the time window, but the partition isn't included in the partition set until its time window has completed. Kicking off a run after the time window completes allows the run to process data for the entire time window. + +Continuing with the daily partition example, the `2024-05-20` partition would have the following start and end times: + +- **Start time** - `2024-05-20 00:00:00` +- **End time** - `2024-05-20 23:59:59` + +After `2024-05-20 23:59:59` passes, the time window is complete and Dagster will add a new `2024-05-20` partition to the partition set. At this point, the process will repeat with the next time window of `2024-05-21`. + +If you need to customize the ending, or most recent partition in a set, use the `end_offset` parameter in the partition's config: + +```python file=/concepts/partitions_schedules_sensors/schedule_from_partitions.py startafter=start_offset_partition endbefore=end_offset_partition +from dagster import DailyPartitionsDefinition + +daily_partition_with_offset = DailyPartitionsDefinition( + start_date="2024-05-20", end_offset=-1 +) +``` + +Setting this parameter changes the partition that will be filled in at each schedule tick. Positive and negative integers are accepted, which will have the following effects: + +- **Positive numbers**, like `1`, cause the schedule to fill in the partition of the **current** hour/day/week/month +- **Negative numbers**, like `-1,` cause the schedule to fill in the partition of an **earlier** hour/day/week/month + +Generally, the calculation for `end_offset` can be expressed as: + +```shell +current_date - 1 type_of_partition + end_offset +``` + +Let's look at an example schedule that's partitioned by day and how different `end_offset` values would affect the most recent partition in the set. In this example, we're using a start date of `2024-05-20`: + +| End offset | Calculated as | Ending (most recent) partition | +| ------------ | ----------------------------- | --------------------------------------- | +| Offset of -1 | `2024-05-20 - 1 day + -1 day` | 2024-05-18 (2 days prior to start date) | +| No offset | `2024-05-20 - 1 day + 0 days` | 2024-05-19 (1 day prior to start date) | +| Offset of 1 | `2024-05-20 - 1 day + 1 day` | 2024-05-20 (start date) | + +--- + +## Working with static partitions + +Next, we'll demonstrate how to create a schedule for a job with a static partition. To do this, we'll construct the schedule from scratch using the decorator, rather than using a helper function like . This will allow more flexibility in determining which partitions should be run by the schedule. + +In this example, the job is partitioned by continent: + +```python file=/concepts/partitions_schedules_sensors/static_partitioned_asset_job.py startafter=start_job endbefore=end_job +from dagster import ( + AssetExecutionContext, + Config, + asset, + define_asset_job, + static_partitioned_config, +) + +CONTINENTS = [ + "Africa", + "Antarctica", + "Asia", + "Europe", + "North America", + "Oceania", + "South America", +] + + +@static_partitioned_config(partition_keys=CONTINENTS) +def continent_config(partition_key: str): + return {"ops": {"continents": {"config": {"continent_name": partition_key}}}} + + +class ContinentOpConfig(Config): + continent_name: str + + +@asset +def continents(context: AssetExecutionContext, config: ContinentOpConfig): + context.log.info(config.continent_name) + + +continent_job = define_asset_job( + name="continent_job", selection=[continents], config=continent_config +) +``` + +Using the decorator, we'll write a schedule that targets each partition, or `continent`: + +```python file=/concepts/partitions_schedules_sensors/static_partitioned_asset_job.py startafter=start_schedule_all_partitions endbefore=end_schedule_all_partitions +from dagster import RunRequest, schedule + + +@schedule(cron_schedule="0 0 * * *", job=continent_job) +def continent_schedule(): + for c in CONTINENTS: + yield RunRequest(run_key=c, partition_key=c) +``` + +If we only want to target the `Antarctica` partition, we can create a schedule like the following: + +```python file=/concepts/partitions_schedules_sensors/static_partitioned_asset_job.py startafter=start_single_partition endbefore=end_single_partition +from dagster import RunRequest, schedule + + +@schedule(cron_schedule="0 0 * * *", job=continent_job) +def antarctica_schedule(): + return RunRequest(partition_key="Antarctica") +``` + +--- + +## APIs in this guide + +| Name | Description | +| --------------------------------------------------------- | --------------------------------------------------------------------------------------------------- | +| | Decorator that defines a schedule that executes according to a given cron schedule. | +| | A function that constructs a schedule whose interval matches the partitioning of a partitioned job. | +| | A class that represents all the information required to launch a single run. | +| | A function for defining a job from a selection of assets. | +| | The decorator used to define a job. | + +--- + +## Related + + + + + + + + diff --git a/docs/content/concepts/partitions-schedules-sensors/partitioning-ops.mdx b/docs/content/concepts/partitions-schedules-sensors/partitioning-ops.mdx index aca9a6110a21..dbc4a3a5d966 100644 --- a/docs/content/concepts/partitions-schedules-sensors/partitioning-ops.mdx +++ b/docs/content/concepts/partitions-schedules-sensors/partitioning-ops.mdx @@ -96,7 +96,7 @@ from datetime import datetime @daily_partitioned_config(start_date=datetime(2020, 1, 1)) -def my_partitioned_config(start: datetime, _end: datetime): +def partitioned_config(start: datetime, _end: datetime): return { "ops": { "process_data_for_date": {"config": {"date": start.strftime("%Y-%m-%d")}} @@ -107,8 +107,8 @@ def my_partitioned_config(start: datetime, _end: datetime): Then you can build a job that uses the `PartitionedConfig` by supplying it to the `config` argument when you construct the job: ```python file=/concepts/partitions_schedules_sensors/partitioned_job.py startafter=start_partitioned_job endbefore=end_partitioned_job -@job(config=my_partitioned_config) -def do_stuff_partitioned(): +@job(config=partitioned_config) +def partitioned_op_job(): process_data_for_date() ``` @@ -157,42 +157,7 @@ def continent_job(): Running a partitioned job on a schedule is a common use case. For example, if your job has a partition for each date, you likely want to run that job every day, on the partition for that day. -The function allows you to construct a schedule from a date partitioned job. It creates a schedule with an interval which matches the spacing of your partition. If you wanted to create a schedule for `do_stuff_partitioned` job defined above, you could write: - -```python file=/concepts/partitions_schedules_sensors/schedule_from_partitions.py startafter=start_marker endbefore=end_marker -from dagster import build_schedule_from_partitioned_job, job - - -@job(config=my_partitioned_config) -def do_stuff_partitioned(): ... - - -do_stuff_partitioned_schedule = build_schedule_from_partitioned_job( - do_stuff_partitioned, -) -``` - -Schedules can also be made from static partitioned jobs. If you wanted to make a schedule for the `continent_job` above that runs each partition, you could write: - -```python file=/concepts/partitions_schedules_sensors/schedule_from_partitions.py startafter=start_static_partition endbefore=end_static_partition -from dagster import schedule, RunRequest - - -@schedule(cron_schedule="0 0 * * *", job=continent_job) -def continent_schedule(): - for c in CONTINENTS: - yield RunRequest(run_key=c, partition_key=c) -``` - -Or a schedule that will run a subselection of the partition: - -```python file=/concepts/partitions_schedules_sensors/schedule_from_partitions.py startafter=start_single_partition endbefore=end_single_partition -@schedule(cron_schedule="0 0 * * *", job=continent_job) -def antarctica_schedule(): - return RunRequest(partition_key="Antarctica") -``` - -Refer to the [Schedules documentation](/concepts/partitions-schedules-sensors/schedules#schedules-from-partitioned-assets-and-jobs) for more info about constructing both schedule types. +Refer to the [Schedule documentation](/concepts/partitions-schedules-sensors/schedules#schedules-from-partitioned-assets-and-jobs) for more info about constructing both schedules for asset and op-based jobs. --- diff --git a/docs/content/concepts/partitions-schedules-sensors/schedules.mdx b/docs/content/concepts/partitions-schedules-sensors/schedules.mdx index ceb8aa78fc37..0528a5745b84 100644 --- a/docs/content/concepts/partitions-schedules-sensors/schedules.mdx +++ b/docs/content/concepts/partitions-schedules-sensors/schedules.mdx @@ -107,12 +107,12 @@ Having defined a date-partitioned job, you can construct a schedule for it using from dagster import build_schedule_from_partitioned_job, job -@job(config=my_partitioned_config) -def do_stuff_partitioned(): ... +@job(config=partitioned_config) +def partitioned_op_job(): ... -do_stuff_partitioned_schedule = build_schedule_from_partitioned_job( - do_stuff_partitioned, +partitioned_op_schedule = build_schedule_from_partitioned_job( + partitioned_op_job, ) ``` @@ -123,15 +123,17 @@ from dagster import ( asset, build_schedule_from_partitioned_job, define_asset_job, - HourlyPartitionsDefinition, + DailyPartitionsDefinition, ) +daily_partition = DailyPartitionsDefinition(start_date="2024-05-20") -@asset(partitions_def=HourlyPartitionsDefinition(start_date="2020-01-01-00:00")) -def hourly_asset(): ... +@asset(partitions_def=daily_partition) +def daily_asset(): ... -partitioned_asset_job = define_asset_job("partitioned_job", selection=[hourly_asset]) + +partitioned_asset_job = define_asset_job("partitioned_job", selection=[daily_asset]) asset_partitioned_schedule = build_schedule_from_partitioned_job( diff --git a/docs/content/concepts/partitions-schedules-sensors/testing-partitions.mdx b/docs/content/concepts/partitions-schedules-sensors/testing-partitions.mdx index 6cc92d24f90a..4d3f160baa08 100644 --- a/docs/content/concepts/partitions-schedules-sensors/testing-partitions.mdx +++ b/docs/content/concepts/partitions-schedules-sensors/testing-partitions.mdx @@ -52,8 +52,8 @@ def test_my_partitioned_config(): } # assert that the output of the decorated function is valid configuration for the - # do_stuff_partitioned job - assert validate_run_config(do_stuff_partitioned, run_config) + # partitioned_op_job job + assert validate_run_config(partitioned_op_job, run_config) ``` If you want to test that a creates the partitions you expect, use the `get_partition_keys` or `get_run_config_for_partition_key` functions: @@ -99,7 +99,7 @@ def test_my_offset_partitioned_config(): assert keys[0] == "2020-01-01" assert keys[1] == "2020-01-02" - # test that the run_config for a partition is valid for do_stuff_partitioned + # test that the run_config for a partition is valid for partitioned_op_job run_config = my_offset_partitioned_config.get_run_config_for_partition_key(keys[0]) assert validate_run_config(do_more_stuff_partitioned, run_config) @@ -120,8 +120,8 @@ def test_my_offset_partitioned_config(): To run a partitioned job in-process on a particular partition, supply a value for the `partition_key` argument of . For example: ```python file=/concepts/partitions_schedules_sensors/partitioned_job_test.py startafter=start endbefore=end -def test_do_stuff_partitioned(): - assert do_stuff_partitioned.execute_in_process(partition_key="2020-01-01").success +def test_partitioned_op_job(): + assert partitioned_op_job.execute_in_process(partition_key="2020-01-01").success ``` --- diff --git a/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/partitioned_config_test.py b/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/partitioned_config_test.py index 14c5efd877a3..a2bd92b2121c 100644 --- a/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/partitioned_config_test.py +++ b/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/partitioned_config_test.py @@ -2,7 +2,7 @@ from docs_snippets.concepts.partitions_schedules_sensors.partitioned_job import ( - do_stuff_partitioned, + partitioned_op_job, ) from dagster import job, op @@ -29,8 +29,8 @@ def test_my_partitioned_config(): } # assert that the output of the decorated function is valid configuration for the - # do_stuff_partitioned job - assert validate_run_config(do_stuff_partitioned, run_config) + # partitioned_op_job job + assert validate_run_config(partitioned_op_job, run_config) # end_partition_config @@ -76,7 +76,7 @@ def test_my_offset_partitioned_config(): assert keys[0] == "2020-01-01" assert keys[1] == "2020-01-02" - # test that the run_config for a partition is valid for do_stuff_partitioned + # test that the run_config for a partition is valid for partitioned_op_job run_config = my_offset_partitioned_config.get_run_config_for_partition_key(keys[0]) assert validate_run_config(do_more_stuff_partitioned, run_config) diff --git a/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/partitioned_job.py b/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/partitioned_job.py index 7b5553d9c91e..33708e1d13d0 100644 --- a/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/partitioned_job.py +++ b/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/partitioned_job.py @@ -14,7 +14,7 @@ def process_data_for_date(context: OpExecutionContext): @daily_partitioned_config(start_date=datetime(2020, 1, 1)) -def my_partitioned_config(start: datetime, _end: datetime): +def partitioned_config(start: datetime, _end: datetime): return { "ops": { "process_data_for_date": {"config": {"date": start.strftime("%Y-%m-%d")}} @@ -26,8 +26,8 @@ def my_partitioned_config(start: datetime, _end: datetime): # start_partitioned_job -@job(config=my_partitioned_config) -def do_stuff_partitioned(): +@job(config=partitioned_config) +def partitioned_op_job(): process_data_for_date() diff --git a/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/partitioned_job_test.py b/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/partitioned_job_test.py index 16ce1906972e..31e917deff9f 100644 --- a/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/partitioned_job_test.py +++ b/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/partitioned_job_test.py @@ -1,13 +1,13 @@ # ruff: isort: skip_file from docs_snippets.concepts.partitions_schedules_sensors.partitioned_job import ( - do_stuff_partitioned, + partitioned_op_job, ) # start -def test_do_stuff_partitioned(): - assert do_stuff_partitioned.execute_in_process(partition_key="2020-01-01").success +def test_partitioned_op_job(): + assert partitioned_op_job.execute_in_process(partition_key="2020-01-01").success # end diff --git a/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/schedule_from_partitions.py b/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/schedule_from_partitions.py index 2da237673f11..a4752b3fab6f 100644 --- a/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/schedule_from_partitions.py +++ b/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/schedule_from_partitions.py @@ -1,17 +1,17 @@ # ruff: isort: skip_file -from .partitioned_job import my_partitioned_config +from .partitioned_job import partitioned_config # start_marker from dagster import build_schedule_from_partitioned_job, job -@job(config=my_partitioned_config) -def do_stuff_partitioned(): ... +@job(config=partitioned_config) +def partitioned_op_job(): ... -do_stuff_partitioned_schedule = build_schedule_from_partitioned_job( - do_stuff_partitioned, +partitioned_op_schedule = build_schedule_from_partitioned_job( + partitioned_op_job, ) # end_marker @@ -22,15 +22,17 @@ def do_stuff_partitioned(): ... asset, build_schedule_from_partitioned_job, define_asset_job, - HourlyPartitionsDefinition, + DailyPartitionsDefinition, ) +daily_partition = DailyPartitionsDefinition(start_date="2024-05-20") -@asset(partitions_def=HourlyPartitionsDefinition(start_date="2020-01-01-00:00")) -def hourly_asset(): ... +@asset(partitions_def=daily_partition) +def daily_asset(): ... -partitioned_asset_job = define_asset_job("partitioned_job", selection=[hourly_asset]) + +partitioned_asset_job = define_asset_job("partitioned_job", selection=[daily_asset]) asset_partitioned_schedule = build_schedule_from_partitioned_job( @@ -63,3 +65,22 @@ def antarctica_schedule(): # end_single_partition + +# start_offset_partition +from dagster import DailyPartitionsDefinition + +daily_partition_with_offset = DailyPartitionsDefinition( + start_date="2024-05-20", end_offset=-1 +) + + +# end_offset_partition + +# start_partitioned_schedule_with_offset +from dagster import build_schedule_from_partitioned_job + +asset_partitioned_schedule = build_schedule_from_partitioned_job( + partitioned_asset_job, hour_of_day=1, minute_of_hour=30 +) + +# end_partitioned_schedule_with_offset diff --git a/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/static_partitioned_asset_job.py b/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/static_partitioned_asset_job.py new file mode 100644 index 000000000000..3f6eca128230 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/static_partitioned_asset_job.py @@ -0,0 +1,63 @@ +# start_job + +from dagster import ( + AssetExecutionContext, + Config, + asset, + define_asset_job, + static_partitioned_config, +) + +CONTINENTS = [ + "Africa", + "Antarctica", + "Asia", + "Europe", + "North America", + "Oceania", + "South America", +] + + +@static_partitioned_config(partition_keys=CONTINENTS) +def continent_config(partition_key: str): + return {"ops": {"continents": {"config": {"continent_name": partition_key}}}} + + +class ContinentOpConfig(Config): + continent_name: str + + +@asset +def continents(context: AssetExecutionContext, config: ContinentOpConfig): + context.log.info(config.continent_name) + + +continent_job = define_asset_job( + name="continent_job", selection=[continents], config=continent_config +) + +# end_job + +# start_schedule_all_partitions +from dagster import RunRequest, schedule + + +@schedule(cron_schedule="0 0 * * *", job=continent_job) +def continent_schedule(): + for c in CONTINENTS: + yield RunRequest(run_key=c, partition_key=c) + + +# end_schedule_all_partitions + +# start_single_partition +from dagster import RunRequest, schedule + + +@schedule(cron_schedule="0 0 * * *", job=continent_job) +def antarctica_schedule(): + return RunRequest(partition_key="Antarctica") + + +# end_single_partition diff --git a/examples/docs_snippets/docs_snippets_tests/concepts_tests/partitions_schedules_sensors_tests/test_partitioned_job.py b/examples/docs_snippets/docs_snippets_tests/concepts_tests/partitions_schedules_sensors_tests/test_partitioned_job.py index 64a154a7c99c..892e9c8ed002 100644 --- a/examples/docs_snippets/docs_snippets_tests/concepts_tests/partitions_schedules_sensors_tests/test_partitioned_job.py +++ b/examples/docs_snippets/docs_snippets_tests/concepts_tests/partitions_schedules_sensors_tests/test_partitioned_job.py @@ -1,7 +1,7 @@ from docs_snippets.concepts.partitions_schedules_sensors.partitioned_job import ( - do_stuff_partitioned, + partitioned_op_job, ) def test_do_stuff(): - assert do_stuff_partitioned.execute_in_process(partition_key="2021-05-01").success + assert partitioned_op_job.execute_in_process(partition_key="2021-05-01").success diff --git a/examples/docs_snippets/docs_snippets_tests/concepts_tests/partitions_schedules_sensors_tests/test_schedule_from_partitions.py b/examples/docs_snippets/docs_snippets_tests/concepts_tests/partitions_schedules_sensors_tests/test_schedule_from_partitions.py index d5c94ddc13b1..4e4ccee173b4 100644 --- a/examples/docs_snippets/docs_snippets_tests/concepts_tests/partitions_schedules_sensors_tests/test_schedule_from_partitions.py +++ b/examples/docs_snippets/docs_snippets_tests/concepts_tests/partitions_schedules_sensors_tests/test_schedule_from_partitions.py @@ -1,10 +1,10 @@ from docs_snippets.concepts.partitions_schedules_sensors.partitioned_job import ( - do_stuff_partitioned, + partitioned_op_job, ) from docs_snippets.concepts.partitions_schedules_sensors.schedule_from_partitions import ( - do_stuff_partitioned_schedule, + partitioned_op_schedule, ) def test_build_schedule_from_partitioned_job(): - assert do_stuff_partitioned_schedule.job_name == do_stuff_partitioned.name + assert partitioned_op_schedule.job_name == partitioned_op_job.name