Skip to content

Commit

Permalink
Asset Observations Doc (#6630)
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Feb 17, 2022
1 parent 7b0b436 commit c71062f
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 1 deletion.
4 changes: 4 additions & 0 deletions docs/content/_navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@
{
"title": "Asset Materializations",
"path": "/concepts/assets/asset-materializations"
},
{
"title": "Asset Observations",
"path": "/concepts/assets/asset-observations"
}
]
}
Expand Down
99 changes: 99 additions & 0 deletions docs/content/concepts/assets/asset-observations.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
---
title: Asset Observations | Dagster
description: Dagster provides functionality to record metadata about assets.
---

# Asset Observations

An asset observation is an event that records metadata about a given asset. Unlike asset materializations, asset observations do not signify that an asset has been mutated.

## Relevant APIs

| Name | Description |
| -------------------------------------- | -------------------------------------------------------------------- |
| <PyObject object="AssetObservation" /> | Dagster event indicating that an asset's metadata has been recorded. |
| <PyObject object="AssetKey" /> | A unique identifier for a particular external asset. |

## Overview

<PyObject object="AssetObservation" /> events are used record metadata in Dagster
about a given asset. Asset observation events can be yielded at runtime within ops
and assets. An asset must be defined using the <PyObject
object="asset"
decorator
/> decorator or have existing materializations in order for its observations to be
displayed.

## Yielding an AssetObservation from an Op

To make Dagster aware that we have recorded metadata about an asset, we can yield an <PyObject object="AssetObservation" /> event:

```python file=/concepts/assets/observations.py startafter=start_observation_asset_marker_0 endbefore=end_observation_asset_marker_0
from dagster import AssetObservation


@op
def observation_op():
df = read_df()
yield AssetObservation(asset_key="observation_asset", metadata={"num_rows": len(df)})
yield Output(5)
```

We should now see an observation event in the event log when we execute this asset.

<Image
alt="asset-observation"
src="/images/concepts/assets/observation.png"
width={1417}
height={917}
/>

### Attaching Metadata to an AssetObservation

There are a variety of types of metadata that can be associated with an observation event, all through the <PyObject object="EventMetadataEntry" /> class. Each observation event optionally takes a dictionary of metadata entries that are then displayed in the event log and the [Asset Details](/concepts/dagit/dagit#asset-details) page. Check our API docs for <PyObject object="EventMetadataEntry" /> for more details on the types of event metadata available.

```python file=concepts/assets/observations.py startafter=start_observation_asset_marker_2 endbefore=end_observation_asset_marker_2
from dagster import op, AssetObservation, Output, EventMetadata


@op
def observes_dataset_op():
df = read_df()
remote_storage_path = persist_to_storage(df)
yield AssetObservation(
asset_key="my_dataset",
metadata={
"text_metadata": "Text-based metadata for this event",
"path": EventMetadata.path(remote_storage_path),
"dashboard_url": EventMetadata.url("http://mycoolsite.com/url_for_my_data"),
"size (bytes)": calculate_bytes(df),
},
)
yield AssetMaterialization(asset_key="my_dataset")
yield Output(remote_storage_path)
```

In the [Asset Details](/concepts/dagit/dagit#asset-details) page, we can see observations in the Asset Activity table.

<Image
alt="asset-activity-observation"
src="/images/concepts/assets/asset-activity-observation.png"
width={1758}
height={1146}
/>

### Specifying a partition for an AssetObservation

If you are observing a single slice of an asset (e.g. a single day's worth of data on a larger table), rather than mutating or creating it entirely, you can indicate this to Dagster by including the `partition` argument on the object.

```python file=/concepts/assets/observations.py startafter=start_partitioned_asset_observation endbefore=end_partitioned_asset_observation
from dagster import op, AssetMaterialization, Output


@op(config_schema={"date": str})
def partitioned_dataset_op(context):
partition_date = context.op_config["date"]
df = read_df_for_date(partition_date)
yield AssetObservation(asset_key="my_partitioned_dataset", partition=partition_date)
yield Output(df)
```
26 changes: 25 additions & 1 deletion docs/content/concepts/ops-jobs-graphs/op-events.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Within the body of an op, it is possible to communicate with the Dagster framewo
| ------------------------------------------ | ------------------------------------------------------------- |
| <PyObject object="Output" /> | Dagster event used to yield an output from an op |
| <PyObject object="AssetMaterialization" /> | Dagster event indicating that an op has materialized an asset |
| <PyObject object="AssetObservation" /> | Dagster event indicating that an op has observed an asset |
| <PyObject object="ExpectationResult" /> | Dagster event representing the result of a data quality check |
| <PyObject object="Failure" /> | Dagster exception indicating that a failure has occurred |
| <PyObject object="RetryRequested" /> | Dagster exception requesting the step to be retried |
Expand All @@ -27,7 +28,7 @@ It is also possible to raise Dagster-specific exceptions, which will indicate to

Often, it may be useful to attach some arbitrary information to an event or exception that is not captured by its basic parameters. Through the <PyObject object="EventMetadataEntry"/> object, we provide a consistent interface for specifying this metadata on a variety of events. Depending on the type of the data, these entries will be rendered in Dagit in a more useful format than a simple unstructured string.

The <PyObject object="AssetMaterialization" />, <PyObject object="ExpectationResult" />, and <PyObject object="Failure" /> objects each accept a `metadata` parameter, which maps string labels to structured values. <PyObject object="Output"/> also accepts this parameter, although this functionality is currently experimental may change in the future.
The <PyObject object="AssetMaterialization" />, <PyObject object="AssetObservation" />, <PyObject object="ExpectationResult" />, and <PyObject object="Failure" /> objects each accept a `metadata` parameter, which maps string labels to structured values. <PyObject object="Output"/> also accepts this parameter, although this functionality is currently experimental may change in the future.

We provide support for a wide variety of potentially useful metadata types, including simple datatypes (`EventMetadataEntry.float`, `EventMetadataEntry.int`, `EventMetadataEntry.text`), as well as more complex information such as markdown and json (`EventMetadataEntry.md`, `EventMetadataEntry.json`).

Expand Down Expand Up @@ -140,6 +141,29 @@ def my_metadata_materialization_op(context):
yield Output(remote_storage_path)
```

### Asset Observations

<PyObject object="AssetObservation" /> events record metadata about assets. Unlike
asset materializations, asset observations do not signify that an asset has been
mutated.

Within ops and assets, you can yield <PyObject object="AssetObservation" /> events at runtime. Similar to attaching metadata to asset materializations, asset observations accept a `metadata` parameter, allowing you to track specific properties of an asset over time.

```python file=/concepts/assets/observations.py startafter=start_observation_asset_marker_0 endbefore=end_observation_asset_marker_0
from dagster import AssetObservation


@op
def observation_op():
df = read_df()
yield AssetObservation(asset_key="observation_asset", metadata={"num_rows": len(df)})
yield Output(5)
```

In the example above, an observation tracks the number of rows in an asset persisted to storage. This information can then be viewed on the [Asset Details](/concepts/dagit/dagit#asset-details) page.

To learn more about asset observations, check out the [Asset Observation](/concepts/assets/asset-observations) documentation.

### Expectation Results

Ops can emit structured events to represent the results of a data quality test. The data quality event class is the <PyObject object="ExpectationResult" />. To generate an expectation result, we can yield an <PyObject object="ExpectationResult" /> event in our op.
Expand Down
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
14 changes: 14 additions & 0 deletions docs/screenshot_capture/screenshots.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,20 @@
- click on the run
vetted: false

- path: concepts/assets/observation.png
defs_file: examples/docs_snippets/docs_snippets/concepts/assets/observations.py
steps:
- run my_observation_job
- click on the run
vetted: false

- path: concepts/assets/asset-activity-observation.png
defs_file: examples/docs_snippets/docs_snippets/concepts/assets/observations.py
steps:
- run my_dataset_job
- View the asset details page for "my_dataset"
vetted: false

- path: concepts/assets/asset-lineage.png
url: https://demo.elementl.show/instance/assets/snowflake/hackernews/comments
vetted: false
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""isort:skip_file"""
# pylint: disable=unused-argument,reimported
from dagster import op, job


def read_df():
return range(372)


def read_df_for_date(_):
return 1


def persist_to_storage(df):
return "tmp"


def calculate_bytes(df):
return 1.0


# start_observation_asset_marker_0
from dagster import AssetObservation


@op
def observation_op():
df = read_df()
yield AssetObservation(asset_key="observation_asset", metadata={"num_rows": len(df)})
yield Output(5)


# end_observation_asset_marker_0

# start_partitioned_asset_observation
from dagster import op, AssetMaterialization, Output


@op(config_schema={"date": str})
def partitioned_dataset_op(context):
partition_date = context.op_config["date"]
df = read_df_for_date(partition_date)
yield AssetObservation(asset_key="my_partitioned_dataset", partition=partition_date)
yield Output(df)


# end_partitioned_asset_observation


# start_observation_asset_marker_2
from dagster import op, AssetObservation, Output, EventMetadata


@op
def observes_dataset_op():
df = read_df()
remote_storage_path = persist_to_storage(df)
yield AssetObservation(
asset_key="my_dataset",
metadata={
"text_metadata": "Text-based metadata for this event",
"path": EventMetadata.path(remote_storage_path),
"dashboard_url": EventMetadata.url("http://mycoolsite.com/url_for_my_data"),
"size (bytes)": calculate_bytes(df),
},
)
yield AssetMaterialization(asset_key="my_dataset")
yield Output(remote_storage_path)


# end_observation_asset_marker_2


@job
def my_observation_job():
observation_op()


@job
def my_dataset_job():
observes_dataset_op()
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from dagster import build_op_context
from docs_snippets.concepts.assets.observations import (
observation_op,
observes_dataset_op,
partitioned_dataset_op,
)


def test_ops_compile_and_execute():
observation_op()
observes_dataset_op()

context = build_op_context(config={"date": "2020-01-01"})
partitioned_dataset_op(context)

1 comment on commit c71062f

@vercel
Copy link

@vercel vercel bot commented on c71062f Feb 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.