Skip to content

Commit

Permalink
document partitioned assets (#7466)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Apr 26, 2022
1 parent 2a3595b commit 5a85d2c
Show file tree
Hide file tree
Showing 16 changed files with 193 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Dagster supports data backfills for each partition or subsets of partitions. Aft

### Using Dagit

You can launch and monitor backfills of a job using the [Partitions tab](/concepts/partitions-schedules-sensors/partitions#the-partitions-tab).
You can launch and monitor backfills of a job using the [Partitions tab](/concepts/partitions-schedules-sensors/partitions#dagit-partitions-tab).

To launch a backfill, click the "Launch backfill" button at the top center of the Partitions tab. This opens the "Launch backfill" modal, which lets you select the set of partitions to launch the backfill over. A run will be launched for each partition.

Expand Down
127 changes: 87 additions & 40 deletions docs/content/concepts/partitions-schedules-sensors/partitions.mdx
Original file line number Diff line number Diff line change
@@ -1,11 +1,29 @@
---
title: Partitioned Jobs | Dagster
description: Dagster "Partitioned Jobs" enable launching backfills, where each partition processes a subset of data.
title: Partitioned Assets and Jobs | Dagster
description: Partitioned assets and jobs enable launching backfills, where each partition processes a subset of data.
---

# Partitioned Jobs
# Partitioned Assets and Jobs

## Relevant APIs
Partitioning allows a job or software-defined asset to correspond to a set of entities with identical structure but different parameters.

A _partitioned job_ is a job where each run corresponds to a partition key. Most commonly, each partition key represents a time window, so, when a job executes, it processes data within one of the time windows.

A _partitioned asset_ is an asset that's composed of a set of partitions, which can be materialized and tracked independently. Most commonly, each partition represents all the records in a data set that fall within a particular time window. Depending on where the asset is stored, each partition might correspond to a file or a slice of a table in a database.

It's common to construct a partitioned job that materializes a particular set of partitioned assets every time it runs.

Having defined a partitioned job or asset, you can:

- View runs by partition in Dagit.
- Define a [schedule](/concepts/partitions-schedules-sensors/schedules) that fills in a partition each time it runs. For example, a job might run each day and process the data that arrived during the previous day.
- Launch [backfills](/concepts/partitions-schedules-sensors/backfills), which are sets of runs that each process a different partition. For example, after making a code change, you might want to run your job on all time windows instead of just one of them.

---

## Partitioned Jobs

### Relevant APIs

| Name | Description |
| ---------------------------------------------------------- | --------------------------------------------------------------------------------------------------- |
Expand All @@ -18,20 +36,6 @@ description: Dagster "Partitioned Jobs" enable launching backfills, where each p
| <PyObject object="dynamic_partitioned_config" decorator /> | Decorator for constructing partitioned config for a set of partition keys that can grow over time. |
| <PyObject object="build_schedule_from_partitioned_job" /> | A function that constructs a schedule whose interval matches the partitioning of a partitioned job. |

## Overview

A _partitioned job_ is a job where each run corresponds to a "partition". The choice of partition determines the run's config. Most commonly, each partition is a time window, so, when a job executes, it processes data within one of the time windows.

Having defined a partitioned job, you can:

- View runs by partition in Dagit.
- Define a [schedule](/concepts/partitions-schedules-sensors/schedules) that fills in a partition each time it runs. For example, a job might run each day and process the data that arrived during the previous day.
- Launch [backfills](/concepts/partitions-schedules-sensors/backfills), which are sets of runs that each process a different partition. For example, after making a code change, you might want to re-run your job on every date that it has run on in the past.

---

## Defining Partitioned Jobs

You define a partitioned job by constructing a <PyObject object="PartitionedConfig" /> object and supplying it when you construct your job.

### Defining a Job with Time Partitions
Expand Down Expand Up @@ -94,6 +98,26 @@ def do_stuff_partitioned():
process_data_for_date()
```

#### Dagit Partitions Tab

In Dagit, you can view runs by partition in the Partitions tab of a Job page.

<img
alt="Partitions Tab"
src="/images/concepts/partitions-schedules-sensors/partitions/partitioned-job.png"
/>

In the "Run Matrix", each column corresponds to one of the partitions in the job. The time listed corresponds to the start time of the partition. Each row corresponds to one of the steps in the job. You can click on an individual box to navigate to logs and run information for the step.

You can view and use partitions in the Dagit Launchpad tab for a job. In the top bar, you can select from the list of all available partitions. Within the config editor, the config for the selected partition will be populated.

In the screenshot below, we select the `2020-01-02` partition, and we can see that the run config for the partition has been populated in the editor.

<img
alt="Partitions in Dagit Launchpad"
src="/images/concepts/partitions-schedules-sensors/partitions/launchpad.png"
/>

In addition to the <PyObject object="daily_partitioned_config" decorator /> decorator, Dagster also provides <PyObject object="monthly_partitioned_config" decorator />, <PyObject object="weekly_partitioned_config" decorator />, <PyObject object="hourly_partitioned_config" decorator />. See the API docs for each of these decorators for more information on how partitions are built based on different `start_date`, `minute_offset`, `hour_offset`, and `day_offset` inputs.

### Defining a Job with Static Partitions
Expand Down Expand Up @@ -129,7 +153,7 @@ def continent_job():
continent_op()
```

## Creating Schedules from Partitioned Jobs
### Creating Schedules from Partitioned Jobs

It's common that, when you have a partitioned job, you want to run it on a schedule. For example, if your job has a partition for each date, you likely want to run that job every day, on the partition for that day.

Expand Down Expand Up @@ -267,34 +291,57 @@ def test_do_stuff_partitioned():
assert do_stuff_partitioned.execute_in_process(partition_key="2020-01-01").success
```

## Partitions in Dagit
## Partitioned Assets

### The Partitions Tab
### Relevant APIs

In Dagit, you can view runs by partition in the Partitions tab of a Job page.
| Name | Description |
| ----------------------------------------------------------- | --------------------------------------------------------------------------------- |
| <PyObject object="PartitionsDefinition" decorator /> | Superclass - defines the set of partitions that can be materialized for an asset. |
| <PyObject object="HourlyPartitionsDefinition" decorator /> | A partitions definition with a partition for each hour. |
| <PyObject object="DailyPartitionsDefinition" decorator /> | A partitions definition with a partition for each day. |
| <PyObject object="WeeklyPartitionsDefinition" decorator /> | A partitions definition with a partition for each week. |
| <PyObject object="MonthlyPartitionsDefinition" decorator /> | A partitions definition with a partition for each month. |
| <PyObject object="StaticPartitionsDefinition" decorator /> | A partitions definition with a fixed set of partitions. |

In the "Run Matrix", each column corresponds to one of the partitions in the job. The time listed corresponds to the start time of the partition. Each row corresponds to one of the steps in the job.
A software-defined asset can be assigned a <PyObject object="PartitionsDefinition" />, which determines the set of partitions that compose it. Once an asset has a set of partitions, you can launch materializations of individual partitions, as well as view the materialization history by partition in Dagit.

<!-- This was generated from go/prod -->
Here's an asset with a partition for each day since the first day of 2022:

<Image
alt="Partitions Tab"
src="/images/concepts/partitions-schedules-sensors/partitions-page.png"
width={3808}
height={2414}
/>
```python file=/concepts/partitions_schedules_sensors/partitioned_asset.py
from dagster import DailyPartitionsDefinition, asset

You can click on an individual box to navigate to logs and run information for the step.

### Launching Partitioned Runs from the Launchpad
@asset(partitions_def=DailyPartitionsDefinition(start_date="2022-01-01"))
def my_daily_partitioned_asset(context):
context.log.info(
f"Processing asset partition '{context.output_asset_partition_key()}'"
)
```

You can view and use partitions in the Dagit Launchpad tab for a job. In the top bar, you can select from the list of all available partitions. Within the config editor, the config for the selected partition will be populated.
When an asset is unpartitioned, the default IO manager stores it in a file whose location is based on the asset's key. When an asset is partitioned, the default IO manager stores each partition in a separate file, all underneath a directory whose location is based on the asset's key.

In the screenshot below, we select the `2020-05-01` partition, and we can see that the run config for the partition has been populated in the editor.
If you open up the "Definition" tab of the details page for a partitioned asset, you'll see a bar that represents all of the partitions for the asset. In this case, the bar is entirely gray, because none of the partitions have been materialized.

<Image
alt="Partitions in Dagit Launchpad"
src="/images/concepts/partitions-schedules-sensors/partitions-playground.png"
width={3808}
height={2414}
/>
<img src="/images/concepts/partitions-schedules-sensors/partitions/partitioned-asset.png" />

When materializing a partitioned asset, you choose which partitions to materialize, and Dagster will launch a run for each partition. If you choose more than one partition, the [Dagster Daemon](/deployment/guides/service#running-dagster-daemon) needs to be running to queue the multiple runs.

<img src="/images/concepts/partitions-schedules-sensors/partitions/rematerialize-partition.png" />

After you've materialized a partition, it will show up as green in the partitions bar.

<img src="/images/concepts/partitions-schedules-sensors/partitions/materialized-partitioned-asset.png" />

If you navigate to the "Activity" tab, you'll be able to see materializations by partition:

<img src="/images/concepts/partitions-schedules-sensors/partitions/materialized-partitioned-asset-activity.png" />

### Partition Dependencies

When a partitioned asset depends on another partitioned asset, each partition in the downstream asset depends on a partition or multiple partitions in the upstream asset.

A few rules govern partition-to-partition dependencies:

- When the upstream asset and downstream asset have the same <PyObject object="PartitionsDefinition" />, each partition in the downstream asset depends on the same partition in the upstream asset.
- When the upstream asset and downstream asset are both time window-partitioned, each partition in the downstream asset depends on all partitions in the upstream asset that intersect its time window. For example, if an asset with a <PyObject object="DailyPartitionsDefinition" /> depends on an asset with an <PyObject object="HourlyPartitionsDefinition" />, then partition `2022-04-12` of the daily asset the would depend on 24 partitions of the hourly asset: `2022-04-12-00:00` through `2022-04-12-23:00`.
2 changes: 1 addition & 1 deletion docs/content/deployment/guides/service.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ In this configuration, Dagit will write execution logs to `$DAGSTER_HOME/logs` a

## Running dagster-daemon

If you're using [schedules](/concepts/partitions-schedules-sensors/schedules) or sensors, or want to set limits on the number of runs that can be executed at once, you'll want to also run a [dagster-daemon service](/deployment/dagster-daemon) as part of your deployment. To run this service locally, run the following command:
If you're using [schedules](/concepts/partitions-schedules-sensors/schedules), [sensors](/concepts/partitions-schedules-sensors/sensors), or [backfills](/concepts/partitions-schedules-sensors/backfills), or want to set limits on the number of runs that can be executed at once, you'll want to also run a [dagster-daemon service](/deployment/dagster-daemon) as part of your deployment. To run this service locally, run the following command:

```shell
pip install dagster
Expand Down
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
- path: partitioned-asset.png
defs_file: examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/partitioned_asset.py
url: http://127.0.0.1:3000/instance/assets/my_daily_partitioned_asset?view=definition

- path: rematerialize-partition.png
defs_file: examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/partitioned_asset.py
url: http://127.0.0.1:3000/instance/assets/my_daily_partitioned_asset?view=definition
steps:
- Click "Materialize"

- path: materialized-partitioned-asset.png
defs_file: examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/partitioned_asset.py
url: http://127.0.0.1:3000/instance/assets/my_daily_partitioned_asset?view=definition
steps:
- Click "Materialize"
- Launch a run
- Wait for the run to complete
- Refresh the page

- path: materialized-partitioned-asset_activity.png
defs_file: examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/partitioned_asset.py
url: http://127.0.0.1:3000/instance/assets/my_daily_partitioned_asset?view=definition
steps:
- Click "Materialize"
- Launch a run
- Wait for the run to complete
- Refresh the page
- Navigate to the activity tag

- path: partitioned-job.png
defs_file: examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/partitioned_job.py
url: http://127.0.0.1:3000/workspace/__repository__do_stuff_partitioned@partitioned_job.py/jobs/do_stuff_partitioned/partitions
page_load_sleep: 5


- path: launchpad.png
defs_file: examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/partitioned_job.py
url: http://127.0.0.1:3000/workspace/__repository__do_stuff_partitioned@partitioned_job.py/jobs/do_stuff_partitioned/playground
steps:
- Select 2020-01-02 partition
34 changes: 32 additions & 2 deletions docs/sphinx/sections/api/apidocs/partitions.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
.. currentmodule:: dagster

Partitions
==========
Partitioned Config
==================

.. autoclass:: PartitionedConfig
:members:
Expand All @@ -18,9 +18,39 @@ Partitions

.. autofunction:: monthly_partitioned_config


Partitions Definitions
======================

.. autoclass:: PartitionsDefinition
:members:

.. autoclass:: HourlyPartitionsDefinition
:members:

.. autoclass:: DailyPartitionsDefinition
:members:

.. autoclass:: WeeklyPartitionsDefinition
:members:

.. autoclass:: MonthlyPartitionsDefinition
:members:

.. autoclass:: TimeWindowPartitionsDefinition
:members:

.. autoclass:: StaticPartitionsDefinition
:members:


Partitioned Schedules
=====================

.. autofunction:: build_schedule_from_partitioned_job
:noindex:


Legacy Functions
================

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from dagster import DailyPartitionsDefinition, asset


@asset(partitions_def=DailyPartitionsDefinition(start_date="2022-01-01"))
def my_daily_partitioned_asset(context):
context.log.info(
f"Processing asset partition '{context.output_asset_partition_key()}'"
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
# isort: skip_file
from dagster import job
"""isort:skip_file"""
from dagster import job, op


@op(config_schema={"date": str})
def process_data_for_date(context):
date = context.op_config["date"]
context.log.info(f"processing data for {date}")

from .date_config_job import process_data_for_date

# start_partitioned_config
from dagster import daily_partitioned_config
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from dagster import AssetGroup
from docs_snippets.concepts.partitions_schedules_sensors.partitioned_asset import (
my_daily_partitioned_asset,
)


def test_partitioned_asset():
assert (
AssetGroup([my_daily_partitioned_asset])
.build_job("a")
.execute_in_process(partition_key="2022-01-01")
.success
)
2 changes: 2 additions & 0 deletions python_modules/dagster/dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
TableSchema,
TableSchemaMetadataValue,
TextMetadataValue,
TimeWindowPartitionsDefinition,
TypeCheck,
UrlMetadataValue,
WeeklyPartitionsDefinition,
Expand Down Expand Up @@ -573,6 +574,7 @@ def __dir__():
"DailyPartitionsDefinition",
"HourlyPartitionsDefinition",
"MonthlyPartitionsDefinition",
"TimeWindowPartitionsDefinition",
"WeeklyPartitionsDefinition",
"Partition",
"PartitionedConfig",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@
DailyPartitionsDefinition,
HourlyPartitionsDefinition,
MonthlyPartitionsDefinition,
TimeWindowPartitionsDefinition,
WeeklyPartitionsDefinition,
daily_partitioned_config,
hourly_partitioned_config,
Expand Down

1 comment on commit 5a85d2c

@vercel
Copy link

@vercel vercel bot commented on 5a85d2c Apr 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.