Skip to content

Commit

Permalink
pass on partitions page for assets (#8355)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Jun 14, 2022
1 parent 8856374 commit d679440
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 55 deletions.
156 changes: 101 additions & 55 deletions docs/content/concepts/partitions-schedules-sensors/partitions.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,69 @@ Having defined a partitioned job or asset, you can:

---

## Partitioned Assets

### Relevant APIs

| Name | Description |
| ------------------------------------------------- | --------------------------------------------------------------------------------- |
| <PyObject object="PartitionsDefinition" /> | Superclass - defines the set of partitions that can be materialized for an asset. |
| <PyObject object="HourlyPartitionsDefinition" /> | A partitions definition with a partition for each hour. |
| <PyObject object="DailyPartitionsDefinition" /> | A partitions definition with a partition for each day. |
| <PyObject object="WeeklyPartitionsDefinition" /> | A partitions definition with a partition for each week. |
| <PyObject object="MonthlyPartitionsDefinition" /> | A partitions definition with a partition for each month. |
| <PyObject object="StaticPartitionsDefinition" /> | A partitions definition with a fixed set of partitions. |

A software-defined asset can be assigned a <PyObject object="PartitionsDefinition" />, which determines the set of partitions that compose it. Once an asset has a set of partitions, you can launch materializations of individual partitions and view the materialization history by partition in Dagit.

For example, below is an asset with a partition for each day since the first day of 2022:

```python file=/concepts/partitions_schedules_sensors/partitioned_asset.py
from dagster import DailyPartitionsDefinition, asset


@asset(partitions_def=DailyPartitionsDefinition(start_date="2022-01-01"))
def my_daily_partitioned_asset(context):
context.log.info(
f"Processing asset partition '{context.asset_partition_key_for_output()}'"
)
```

When an asset is unpartitioned, the default IO manager stores it in a file whose location is based on the asset's key. When an asset is partitioned, the default IO manager stores each partition in a separate file, all underneath a directory whose location is based on the asset's key.

To view all partitions for an asset, open the **Definition** tab of the asset's details page. The bar in the **Partitions** section represents all of the partitions for the asset.

In the following image, the partitions bar is entirely gray. This is because none of the partitions have been materialized:

<img src="/images/concepts/partitions-schedules-sensors/partitions/partitioned-asset.png" />

### Materializating partitioned assets

When you materialize a partitioned asset, you choose which partitions to materialize, and Dagster will launch a run for each partition.

**Note**: If you choose more than one partition, the [Dagster Daemon](/deployment/guides/service#running-dagster-daemon) needs to be running to queue the multiple runs.

<img src="/images/concepts/partitions-schedules-sensors/partitions/rematerialize-partition.png" />

After you've materialized a partition, it will show up as green in the partitions bar.

<img src="/images/concepts/partitions-schedules-sensors/partitions/materialized-partitioned-asset.png" />

To view materializations by partition, navigate to the **Activity** tab:

<img src="/images/concepts/partitions-schedules-sensors/partitions/materialized-partitioned-asset-activity.png" />

### Partition Dependencies

When a partitioned asset depends on another partitioned asset, each partition in the downstream asset depends on a partition or multiple partitions in the upstream asset.

A few rules govern partition-to-partition dependencies:

- When the upstream asset and downstream asset have the same <PyObject object="PartitionsDefinition" />, each partition in the downstream asset depends on the same partition in the upstream asset.
- When the upstream asset and downstream asset are both time window-partitioned, each partition in the downstream asset depends on all partitions in the upstream asset that intersect its time window.

For example, if an asset with a <PyObject object="DailyPartitionsDefinition" /> depends on an asset with an <PyObject object="HourlyPartitionsDefinition" />, then partition `2022-04-12` of the daily asset the would depend on 24 partitions of the hourly asset: `2022-04-12-00:00` through `2022-04-12-23:00`.

## Partitioned Jobs

### Relevant APIs
Expand Down Expand Up @@ -199,6 +262,44 @@ def antarctica_schedule():

Refer to the [Schedules documentation](/concepts/partitions-schedules-sensors/schedules#schedules-from-partitioned-assets-and-jobs) for more info about constructing both schedule types.

## Partitioned Asset Job

A partitioned asset job is a job that materializes a particular set of partitioned assets every time it runs.

```python file=/concepts/partitions_schedules_sensors/partitioned_asset_job.py
from dagster import (
AssetSelection,
HourlyPartitionsDefinition,
asset,
define_asset_job,
repository,
)

hourly_partitions_def = HourlyPartitionsDefinition(start_date="2022-05-31-00:00")


@asset(partitions_def=hourly_partitions_def)
def asset1():
...


@asset(partitions_def=hourly_partitions_def)
def asset2():
...


partitioned_asset_job = define_asset_job(
name="asset_1_and_2_job",
selection=AssetSelection.assets(asset1, asset2),
partitions_def=hourly_partitions_def,
)


@repository
def repo():
return [asset1, asset2, partitioned_asset_job]
```

## Testing

### Testing Partitioned Config
Expand Down Expand Up @@ -291,61 +392,6 @@ def test_do_stuff_partitioned():
assert do_stuff_partitioned.execute_in_process(partition_key="2020-01-01").success
```

## Partitioned Assets

### Relevant APIs

| Name | Description |
| ----------------------------------------------------------- | --------------------------------------------------------------------------------- |
| <PyObject object="PartitionsDefinition" decorator /> | Superclass - defines the set of partitions that can be materialized for an asset. |
| <PyObject object="HourlyPartitionsDefinition" decorator /> | A partitions definition with a partition for each hour. |
| <PyObject object="DailyPartitionsDefinition" decorator /> | A partitions definition with a partition for each day. |
| <PyObject object="WeeklyPartitionsDefinition" decorator /> | A partitions definition with a partition for each week. |
| <PyObject object="MonthlyPartitionsDefinition" decorator /> | A partitions definition with a partition for each month. |
| <PyObject object="StaticPartitionsDefinition" decorator /> | A partitions definition with a fixed set of partitions. |

A software-defined asset can be assigned a <PyObject object="PartitionsDefinition" />, which determines the set of partitions that compose it. Once an asset has a set of partitions, you can launch materializations of individual partitions, as well as view the materialization history by partition in Dagit.

Here's an asset with a partition for each day since the first day of 2022:

```python file=/concepts/partitions_schedules_sensors/partitioned_asset.py
from dagster import DailyPartitionsDefinition, asset


@asset(partitions_def=DailyPartitionsDefinition(start_date="2022-01-01"))
def my_daily_partitioned_asset(context):
context.log.info(
f"Processing asset partition '{context.asset_partition_key_for_output()}'"
)
```

When an asset is unpartitioned, the default IO manager stores it in a file whose location is based on the asset's key. When an asset is partitioned, the default IO manager stores each partition in a separate file, all underneath a directory whose location is based on the asset's key.

If you open up the "Definition" tab of the details page for a partitioned asset, you'll see a bar that represents all of the partitions for the asset. In this case, the bar is entirely gray, because none of the partitions have been materialized.

<img src="/images/concepts/partitions-schedules-sensors/partitions/partitioned-asset.png" />

When materializing a partitioned asset, you choose which partitions to materialize, and Dagster will launch a run for each partition. If you choose more than one partition, the [Dagster Daemon](/deployment/guides/service#running-dagster-daemon) needs to be running to queue the multiple runs.

<img src="/images/concepts/partitions-schedules-sensors/partitions/rematerialize-partition.png" />

After you've materialized a partition, it will show up as green in the partitions bar.

<img src="/images/concepts/partitions-schedules-sensors/partitions/materialized-partitioned-asset.png" />

If you navigate to the "Activity" tab, you'll be able to see materializations by partition:

<img src="/images/concepts/partitions-schedules-sensors/partitions/materialized-partitioned-asset-activity.png" />

### Partition Dependencies

When a partitioned asset depends on another partitioned asset, each partition in the downstream asset depends on a partition or multiple partitions in the upstream asset.

A few rules govern partition-to-partition dependencies:

- When the upstream asset and downstream asset have the same <PyObject object="PartitionsDefinition" />, each partition in the downstream asset depends on the same partition in the upstream asset.
- When the upstream asset and downstream asset are both time window-partitioned, each partition in the downstream asset depends on all partitions in the upstream asset that intersect its time window. For example, if an asset with a <PyObject object="DailyPartitionsDefinition" /> depends on an asset with an <PyObject object="HourlyPartitionsDefinition" />, then partition `2022-04-12` of the daily asset the would depend on 24 partitions of the hourly asset: `2022-04-12-00:00` through `2022-04-12-23:00`.

## See it in action

For more examples of partitions, check out the following in our [Hacker News example](https://github.com/dagster-io/dagster/tree/master/examples/hacker_news):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from dagster import (
AssetSelection,
HourlyPartitionsDefinition,
asset,
define_asset_job,
repository,
)

hourly_partitions_def = HourlyPartitionsDefinition(start_date="2022-05-31-00:00")


@asset(partitions_def=hourly_partitions_def)
def asset1():
...


@asset(partitions_def=hourly_partitions_def)
def asset2():
...


partitioned_asset_job = define_asset_job(
name="asset_1_and_2_job",
selection=AssetSelection.assets(asset1, asset2),
partitions_def=hourly_partitions_def,
)


@repository
def repo():
return [asset1, asset2, partitioned_asset_job]
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from docs_snippets.concepts.partitions_schedules_sensors.partitioned_asset_job import (
repo,
)


def test():
assert repo.get_job("asset_1_and_2_job")

1 comment on commit d679440

@vercel
Copy link

@vercel vercel bot commented on d679440 Jun 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.