From aa6786b63e8f3a3a288e934f7fac56f4a887f824 Mon Sep 17 00:00:00 2001 From: Erin Cochran Date: Fri, 10 May 2024 12:47:20 -0400 Subject: [PATCH] [docs] - Add examples page for Schedules (DOC-191) (#21735) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary & Motivation This PR experiments with adding an **Examples** page for Schedules. It pulls some examples from the current Schedules concept page and simplifies the explanations to make it easier to skim. The focus of this reference is not to teach users how to do X, but to quickly demonstrate, provide relevant links to jump to, and let them get back to work. I've added a summary table to each example that contains: - Any notes, tips, etc. about the example - Links to related documentation. For example, where the code is used and has a more learning-focused explanation - A list of APIs used in each example ![Screenshot 2024-05-08 at 5 27 36 PM](https://github.com/dagster-io/dagster/assets/16615212/a9c04055-5a0b-4a3d-b294-b011c7fa337b) ## How I Tested These Changes 👀 --- .../automating-assets-schedules-jobs.mdx | 12 +- .../automation/schedules/examples.mdx | 751 ++++++++++++++++++ 2 files changed, 757 insertions(+), 6 deletions(-) create mode 100644 docs/content/concepts/automation/schedules/examples.mdx diff --git a/docs/content/concepts/automation/schedules/automating-assets-schedules-jobs.mdx b/docs/content/concepts/automation/schedules/automating-assets-schedules-jobs.mdx index 528ac4e8f793..216b6a72216d 100644 --- a/docs/content/concepts/automation/schedules/automating-assets-schedules-jobs.mdx +++ b/docs/content/concepts/automation/schedules/automating-assets-schedules-jobs.mdx @@ -180,12 +180,12 @@ dagster schedule stop You can set the schedule's default status using `DefaultScheduleStatus.RUNNING` in the schedule's : -```python file=concepts/partitions_schedules_sensors/schedules/basic_asset_schedule.py lines=43-47 -# start_definitions -defs = Definitions( - assets=[orders_asset, users_asset], - jobs=[ecommerce_job], - schedules=[ecommerce_schedule], +```python file=concepts/partitions_schedules_sensors/schedules/basic_asset_schedule.py startafter=start_schedule endbefore=end_schedule +ecommerce_schedule = ScheduleDefinition( + job=ecommerce_job, + cron_schedule="15 5 * * 1-5", + default_status=DefaultScheduleStatus.RUNNING, +) ``` diff --git a/docs/content/concepts/automation/schedules/examples.mdx b/docs/content/concepts/automation/schedules/examples.mdx new file mode 100644 index 000000000000..e16c476575f6 --- /dev/null +++ b/docs/content/concepts/automation/schedules/examples.mdx @@ -0,0 +1,751 @@ +--- +title: "Schedule examples | Dagster Docs" +description: "Examples focused on Dagster schedules." +--- + +# Schedule examples + +This reference contains a variety of examples using Dagster [schedules](/concepts/partitions-schedules-sensors/schedules). Each example contains: + +- A summary +- Additional notes +- Links to relevant documentation +- A list of the APIs used in the example + +--- + +## Defining basic schedules + +The following examples demonstrate how to define some basic schedules. + + + + +This example demonstrates how to define a schedule using that will run a job every day at midnight. While this example uses [op jobs](/concepts/ops-jobs-graphs/jobs) (), the same approach will work with [asset jobs](/concepts/assets/asset-jobs) (). + +```python file=concepts/partitions_schedules_sensors/schedules/schedules.py startafter=start_basic_schedule endbefore=end_basic_schedule +@job +def my_job(): ... + + +basic_schedule = ScheduleDefinition(job=my_job, cron_schedule="0 0 * * *") +``` + + + + + + + + + + + + + + + + +
+ Notes + + The cron_schedule argument accepts standard{" "} + cron expressions. If + your croniter dependency's version is{" "} + >= 1.0.12, the argument will also accept the following: +
    +
  • + @daily +
  • +
  • + @hourly +
  • +
  • + @monthly +
  • +
+
+ Related docs + + +
+ APIs in this example + + ,{" "} + +
+ +
+ + +This example demonstrates how to define a schedule using , which provides more flexibility than . For example, you can [configure job behavior based on its scheduled run time](#configuring-job-behavior-based-on-scheduled-run-time) or [emit log messages](#emitting-log-messages-from-schedule-evaluations). + +```python +@schedule(job=my_job, cron_schedule="0 0 * * *") +def basic_schedule(): ... + # things the schedule does, like returning a RunRequest or SkipReason +``` + + + + + + + + + + + + + + + + +
+ Notes + + The decorator's cron_schedule argument accepts standard{" "} + cron expressions. If + your croniter dependency's version is{" "} + >= 1.0.12, the argument will also accept the following: +
    +
  • + @daily +
  • +
  • + @hourly +
  • +
  • + @monthly +
  • +
+
+ Related docs + + +
+ APIs in this example + + ,{" "} + +
+ +
+
+ +--- + +## Emitting log messages from schedule evaluations + +This example demonstrates how to emit log messages from a schedule during its evaluation function. These logs will be visible in the UI when you inspect a tick in the schedule's tick history. + +```python file=concepts/partitions_schedules_sensors/schedules/schedules.py startafter=start_schedule_logging endbefore=end_schedule_logging +@schedule(job=my_job, cron_schedule="* * * * *") +def logs_then_skips(context): + context.log.info("Logging from a schedule!") + return SkipReason("Nothing to do") +``` + + + + + + + + + + + + + + + + +
+ Notes + + Schedule logs are stored in your{" "} + + Dagster instance's compute log storage + + . You should ensure that your compute log storage is configured to view your + schedule logs. +
+ Related docs + + +
+ APIs in this example + + ,{" "} + ,{" "} + +
+ +--- + +## Using resources in schedules + +This example demonstrates how to use resources in schedules. To specify a resource dependency, annotate the resource as a parameter to the schedule's function. + +```python file=/concepts/resources/pythonic_resources.py startafter=start_new_resource_on_schedule endbefore=end_new_resource_on_schedule dedent=4 +from dagster import ( + schedule, + ScheduleEvaluationContext, + ConfigurableResource, + job, + RunRequest, + RunConfig, + Definitions, +) +from datetime import datetime +from typing import List + +class DateFormatter(ConfigurableResource): + format: str + + def strftime(self, dt: datetime) -> str: + return dt.strftime(self.format) + +@job +def process_data(): ... + +@schedule(job=process_data, cron_schedule="* * * * *") +def process_data_schedule( + context: ScheduleEvaluationContext, + date_formatter: DateFormatter, +): + formatted_date = date_formatter.strftime(context.scheduled_execution_time) + + return RunRequest( + run_key=None, + tags={"date": formatted_date}, + ) + +defs = Definitions( + jobs=[process_data], + schedules=[process_data_schedule], + resources={"date_formatter": DateFormatter(format="%Y-%m-%d")}, +) +``` + + + + + + + + + + + + + + + + +
+ Notes + + All Dagster definitions, including schedules and resources, must be + attached to a call. +
+ Related docs + + +
+ APIs in this example + +
    +
  • + +
  • +
  • + +
  • +
  • + +
  • +
  • + +
  • +
  • + +
  • +
  • + +
  • +
  • + +
  • +
+
+ +--- + +## Configuring job behavior based on scheduled run time + +This example demonstrates how to use run config to vary the behavior of a job based on its scheduled run time. + +```python file=concepts/partitions_schedules_sensors/schedules/schedules.py startafter=start_run_config_schedule endbefore=end_run_config_schedule +@op(config_schema={"scheduled_date": str}) +def configurable_op(context: OpExecutionContext): + context.log.info(context.op_config["scheduled_date"]) + + +@job +def configurable_job(): + configurable_op() + + +@schedule(job=configurable_job, cron_schedule="0 0 * * *") +def configurable_job_schedule(context: ScheduleEvaluationContext): + scheduled_date = context.scheduled_execution_time.strftime("%Y-%m-%d") + return RunRequest( + run_key=None, + run_config={ + "ops": {"configurable_op": {"config": {"scheduled_date": scheduled_date}}} + }, + tags={"date": scheduled_date}, + ) +``` + + + + + + + + + + + + + + + + +
+ Notes +
+ Related docs + + Op jobs +
+ APIs in this example + + , ,{" "} + ,{" "} + ,{" "} + +
+ +--- + +## Customizing execution timezones + +This example demonstrates how to customize the timezone a schedule executes in. The schedule in this example will execute every day at 9AM in US/Pacific time. + +```python file=concepts/partitions_schedules_sensors/schedules/schedules.py startafter=start_timezone endbefore=end_timezone +my_timezone_schedule = ScheduleDefinition( + job=my_job, cron_schedule="0 9 * * *", execution_timezone="US/Pacific" +) +``` + + + + + + + + + + + + +
+ Notes + +
    +
  • + The decorator also accepts + the execution_timezone argument +
  • +
  • Schedules without a set timezone will run in UTC.
  • +
  • + Schedules from partitioned jobs execute in the timezone defined on + the partitioned config +
  • +
+
+ APIs in this example + + +
+ + + +--- + +## Constructing schedules for partitioned assets and jobs + +This section demonstrates how to use schedules with partitions. We'll cover: + +- Using a helper function to automatically construct schedules based on the partition's config +- Using to manually construct schedules + +### Automatically constructing schedules + +The follow examples demonstrate how to automatically construct schedules for partitioned assets and jobs using a helper function. These examples use , which will build a schedule with a cadence that matches the spacing of the partitions in the asset or job. + +This approach can be used with time or static-based partitions. + + + + +#### Partitioned assets + +This example demonstrates how to automatically construct a schedule for a time-partitioned asset using . + +```python file=/concepts/partitions_schedules_sensors/schedule_from_partitions.py startafter=start_partitioned_asset_schedule endbefore=end_partitioned_asset_schedule +from dagster import ( + asset, + build_schedule_from_partitioned_job, + define_asset_job, + HourlyPartitionsDefinition, +) + + +@asset(partitions_def=HourlyPartitionsDefinition(start_date="2020-01-01-00:00")) +def hourly_asset(): ... + + +partitioned_asset_job = define_asset_job("partitioned_job", selection=[hourly_asset]) + + +asset_partitioned_schedule = build_schedule_from_partitioned_job( + partitioned_asset_job, +) +``` + + + + + + + + + + + + + + + + +
+ Notes + + If the partition has a timezone defined, the schedule will execute in + the timezone specified on the partitioned config. +
+ Related docs + + +
+ APIs in this example + +
    +
  • + +
  • +
  • + +
  • +
  • + +
  • +
  • + +
  • +
+
+ +
+ + +#### Partitioned op jobs + +This example demonstrates how to construct a schedule for a time-partitioned op job using . + +```python file=/concepts/partitions_schedules_sensors/schedule_from_partitions.py startafter=start_marker endbefore=end_marker +from dagster import build_schedule_from_partitioned_job, job + + +@job(config=my_partitioned_config) +def do_stuff_partitioned(): ... + + +do_stuff_partitioned_schedule = build_schedule_from_partitioned_job( + do_stuff_partitioned, +) +``` + + + + + + + + + + + + + + + + +
+ Notes + + If the partition has a timezone defined, the schedule will execute in + the timezone specified on the partitioned config. +
+ Related docs + + +
+ APIs in this example + + ,{" "} + +
+ +
+
+ +### Manually constructing schedules + +This example demonstrates how to manually construct a schedule for a job with a static partition from scratch using the decorator. + +Using allows for more flexibility in determining which partitions should be run by the schedule, rather than using which automatically creates the schedule based on the partitioned config. + +```python file=/concepts/partitions_schedules_sensors/schedule_from_partitions.py startafter=start_static_partition endbefore=end_static_partition +from dagster import schedule, RunRequest + + +@schedule(cron_schedule="0 0 * * *", job=continent_job) +def continent_schedule(): + for c in CONTINENTS: + yield RunRequest(run_key=c, partition_key=c) +``` + + + + + + + + + + + + +
+ Related docs + + +
+ APIs in this example + + ,{" "} + +
+ +--- + +## Testing schedules + +Refer to the [Testing schedules guide](/concepts/automation/schedules/testing) to view examples of tests alongside the schedules they target. + +--- + +## Want more inspiration? + +If you're looking for additional inspiration, we recommend: + +- [**Dagster Open Platform**](https://github.com/dagster-io/dagster-open-platform), which is Dagster Lab's open-source data platform. This full-sized project contains real assets and other Dagster features used by the Dagster Labs team. +- [**GitHub Discussions**](https://github.com/dagster-io/dagster/discussions), where you can ask questions and get inspired by the Dagster community +- The [**Awesome Dagster** repository](https://github.com/dagster-io/awesome-dagster), which is a collection of all awesome things related to Dagster, including other users' projects, talks, articles, and more