Skip to content

Commit

Permalink
Migrate user event docs to use log_event, various docs cleanups (#6772)
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Feb 24, 2022
1 parent b26ba51 commit a360bbd
Show file tree
Hide file tree
Showing 17 changed files with 416 additions and 221 deletions.
98 changes: 58 additions & 40 deletions docs/content/concepts/assets/asset-materializations.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Op outputs often correspond to assets. For example, an op might be responsible f

Assets can also have partitions, which refer to slices of the overall asset. The simplest example would be a table that has a partition for each day. A given op execution may simply write a single day's worth of data to that table, rather than dropping the entire table and replacing it with new data.

Dagster lets you track the interactions between ops, outputs, and assets over time and view them in the Dagit [Asset Catalog](/concepts/dagit/dagit#assets). Every asset has a "key", which serves as a unique identifier for that particular entity. The act of creating or updating the contents of an asset is called a "materialization", and Dagster tracks these materializations using <PyObject object="AssetMaterialization" /> events. These events can either be yielded by the user at runtime, or automatically created by Dagster in cases where an <PyObject object="AssetKey" /> has been referenced by an op output.
Dagster lets you track the interactions between ops, outputs, and assets over time and view them in the Dagit [Asset Catalog](/concepts/dagit/dagit#assets). Every asset has a "key", which serves as a unique identifier for that particular entity. The act of creating or updating the contents of an asset is called a "materialization", and Dagster tracks these materializations using <PyObject object="AssetMaterialization" /> events. These events can either be logged by the user at runtime, or automatically created by Dagster in cases where an <PyObject object="AssetKey" /> has been referenced by an op output.

## Relevant APIs

Expand All @@ -27,15 +27,15 @@ There are two general patterns for dealing with assets when using Dagster:
- Put the logic to write/store assets inside the body of an op.
- Focus the op purely on business logic, and delegate the logic to write/store assets to an [IOManager](/concepts/io-management/io-managers).

Regardless of which pattern you are using, <PyObject module="dagster" object="AssetMaterialization" /> events are used to communicate to Dagster that a materialization has occurred. You can create these events either by explicitly yielding them at runtime, or (using an experimental interface), have Dagster automatically generate them by defining that a given op output corresponds to a given <PyObject module="dagster" object="AssetKey" />.
Regardless of which pattern you are using, <PyObject module="dagster" object="AssetMaterialization" /> events are used to communicate to Dagster that a materialization has occurred. You can create these events either by explicitly logging them at runtime, or (using an experimental interface), have Dagster automatically generate them by defining that a given op output corresponds to a given <PyObject module="dagster" object="AssetKey" />.

## Explicit AssetMaterializations

One way of recording materialization events is to explicitly yield <PyObject module="dagster" object="AssetMaterialization" /> events at runtime. These events should be co-located with your materialization logic, meaning if you store your object within your op body, then you should yield from within that op, and if you store your object using an <PyObject module="dagster" object="IOManager" />, then you should yield the event from your manager.
One way of recording materialization events is to log <PyObject module="dagster" object="AssetMaterialization" /> events at runtime. These events should be co-located with your materialization logic, meaning if you store your object within your op body, then you should log from within that op, and if you store your object using an <PyObject module="dagster" object="IOManager" />, then you should log the event from your manager.

### Yielding an AssetMaterialization from a Op
### Logging an AssetMaterialization from a Op

To make Dagster aware that we materialized an asset in our op, we can yield an <PyObject module="dagster" object="AssetMaterialization" /> event. This would involve changing the following op:
To make Dagster aware that we materialized an asset in our op, we can log an <PyObject module="dagster" object="AssetMaterialization" /> event using the method <PyObject object="OpExecutionContext" method="log_event" />. This would involve changing the following op:

```python file=/concepts/assets/materialization_ops.py startafter=start_materialization_ops_marker_0 endbefore=end_materialization_ops_marker_0
from dagster import op
Expand All @@ -51,19 +51,19 @@ def my_simple_op():
into something like this:

```python file=/concepts/assets/materialization_ops.py startafter=start_materialization_ops_marker_1 endbefore=end_materialization_ops_marker_1
from dagster import op, Output
from dagster import AssetMaterialization, op


@op
def my_materialization_op(context):
df = read_df()
remote_storage_path = persist_to_storage(df)
yield AssetMaterialization(asset_key="my_dataset", description="Persisted result to storage")
yield Output(remote_storage_path)
context.log_event(
AssetMaterialization(asset_key="my_dataset", description="Persisted result to storage")
)
return remote_storage_path
```

Note: Our materialization op must now explicitly yield an <PyObject module="dagster" object="Output" /> event instead of relying on the implicit conversion of the return value into an <PyObject module="dagster" object="Output" /> event.

We should now see a materialization event in the event log when we execute a job with this op.

<!-- This was generated with:
Expand All @@ -77,11 +77,14 @@ width={3808}
height={2414}
/>

### Yielding an AssetMaterialization from an IOManager
### Logging an AssetMaterialization from an IOManager

To record that an <PyObject object ="IOManager"/> has mutated or created an asset, we can yield an <PyObject module="dagster" object="AssetMaterialization" /> event from its `handle_output` method.
To record that an <PyObject object ="IOManager"/> has mutated or created an asset, we can log an <PyObject module="dagster" object="AssetMaterialization" /> event from its `handle_output` method. We do this via the method <PyObject object="OutputContext" method="log_event" />.

```python file=/concepts/assets/materialization_io_managers.py startafter=start_marker_0 endbefore=end_marker_0
from dagster import AssetMaterialization, IOManager


class PandasCsvIOManager(IOManager):
def load_input(self, context):
file_path = os.path.join("my_base_dir", context.step_key, context.name)
Expand All @@ -92,8 +95,10 @@ class PandasCsvIOManager(IOManager):

obj.to_csv(file_path)

yield AssetMaterialization(
asset_key=AssetKey(file_path), description="Persisted result to storage."
context.log_event(
AssetMaterialization(
asset_key=AssetKey(file_path), description="Persisted result to storage."
)
)
```

Expand All @@ -104,29 +109,34 @@ There are a variety of types of metadata that can be associated with a materiali
#### Example: Op body

```python file=concepts/assets/materialization_ops.py startafter=start_materialization_ops_marker_2 endbefore=end_materialization_ops_marker_2
from dagster import op, AssetMaterialization, Output, MetadataValue
from dagster import op, AssetMaterialization, MetadataValue


@op
def my_metadata_materialization_op(context):
df = read_df()
remote_storage_path = persist_to_storage(df)
yield AssetMaterialization(
asset_key="my_dataset",
description="Persisted result to storage",
metadata={
"text_metadata": "Text-based metadata for this event",
"path": MetadataValue.path(remote_storage_path),
"dashboard_url": MetadataValue.url("http://mycoolsite.com/url_for_my_data"),
"size (bytes)": calculate_bytes(df),
},
context.log_event(
AssetMaterialization(
asset_key="my_dataset",
description="Persisted result to storage",
metadata={
"text_metadata": "Text-based metadata for this event",
"path": MetadataValue.path(remote_storage_path),
"dashboard_url": MetadataValue.url("http://mycoolsite.com/url_for_my_data"),
"size (bytes)": calculate_bytes(df),
},
)
)
yield Output(remote_storage_path)
return remote_storage_path
```

#### Example: IOManager

```python file=concepts/assets/materialization_io_managers.py startafter=start_marker_1 endbefore=end_marker_1
from dagster import AssetMaterialization, IOManager


class PandasCsvIOManagerWithAsset(IOManager):
def load_input(self, context):
file_path = os.path.join("my_base_dir", context.step_key, context.name)
Expand All @@ -137,13 +147,15 @@ class PandasCsvIOManagerWithAsset(IOManager):

obj.to_csv(file_path)

yield AssetMaterialization(
asset_key=AssetKey(file_path),
description="Persisted result to storage.",
metadata={
"number of rows": obj.shape[0],
"some_column mean": obj["some_column"].mean(),
},
context.log_event(
AssetMaterialization(
asset_key=AssetKey(file_path),
description="Persisted result to storage.",
metadata={
"number of rows": obj.shape[0],
"some_column mean": obj["some_column"].mean(),
},
)
)
```

Expand All @@ -154,16 +166,16 @@ Check our API docs for <PyObject module="dagster" object="MetadataEntry" /> for
If you are materializing a single slice of an asset (e.g. a single day's worth of data on a larger table), rather than mutating or creating it entirely, you can indicate this to Dagster by including the `partition` argument on the object.

```python file=/concepts/assets/materialization_ops.py startafter=start_partitioned_asset_materialization endbefore=end_partitioned_asset_materialization
from dagster import op, AssetMaterialization, Output
from dagster import op, AssetMaterialization


@op(config_schema={"date": str})
def my_partitioned_asset_op(context):
partition_date = context.op_config["date"]
df = read_df_for_date(partition_date)
remote_storage_path = persist_to_storage(df)
yield AssetMaterialization(asset_key="my_dataset", partition=partition_date)
yield Output(remote_storage_path)
context.log_event(AssetMaterialization(asset_key="my_dataset", partition=partition_date))
return remote_storage_path
```

## Linking Op Outputs to Assets <Experimental />
Expand All @@ -178,15 +190,15 @@ from dagster import op, Output, AssetMaterialization
def my_asset_op(context):
df = read_df()
persist_to_storage(df)
yield AssetMaterialization(asset_key="my_dataset")
yield Output(df)
context.log_event(AssetMaterialization(asset_key="my_dataset"))
return df
```

In this case, the <PyObject object="AssetMaterialization" /> and the <PyObject object="Output" /> events both correspond to the same data, the dataframe that we have created. With this in mind, we can simplify the above code, and provide useful information to the Dagster framework, by making this link between the `my_dataset` asset and the output of this op explicit.

Just as there are two places in which you can yield runtime <PyObject object="AssetMaterialization" /> events (within an op body and within an IOManager), we provide two different interfaces for linking an op output to to an asset. Regardless of which you choose, every time the op runs and yields that output, an <PyObject object="AssetMaterialization" /> event will automatically be created to record this information.
Just as there are two places in which you can log runtime <PyObject object="AssetMaterialization" /> events (within an op body and within an IOManager), we provide two different interfaces for linking an op output to to an asset. Regardless of which you choose, every time the op runs and logs that output, an <PyObject object="AssetMaterialization" /> event will automatically be created to record this information.

If you specified any metadata entries on the <PyObject object="Output" /> event while yielding it (see: [Op Event Docs](/concepts/ops-jobs-graphs/op-events#attaching-metadata-to-outputs)), these entries will automatically be attached to the materialization event for this asset.
If you use an <PyObject object="Output" /> event to yield your output, and specified any metadata entries on it, (see: [Op Event Docs](/concepts/ops-jobs-graphs/op-events#attaching-metadata-to-outputs)), these entries will automatically be attached to the materialization event for this asset.

### Linking assets to an Output Definition <Experimental />

Expand All @@ -200,7 +212,7 @@ from dagster import op, Output, Out, AssetKey
def my_constant_asset_op(context):
df = read_df()
persist_to_storage(df)
yield Output(df)
return df
```

### Linking assets to outputs with an IOManager <Experimental />
Expand All @@ -210,6 +222,9 @@ If you've defined a custom <PyObject object="IOManager"/> to handle storing your
Similar to the above interface, this function takes an <PyObject object="OutputContext"/> and returns an <PyObject object="AssetKey"/>. The following example functions nearly identically to `PandasCsvIOManagerWithMetadata` from the [runtime example](/concepts/assets/asset-materializations#example-iomanager) above.

```python file=/concepts/assets/materialization_io_managers.py startafter=start_asset_def endbefore=end_asset_def
from dagster import AssetKey, IOManager, MetadataEntry


class PandasCsvIOManagerWithOutputAsset(IOManager):
def load_input(self, context):
file_path = os.path.join("my_base_dir", context.step_key, context.name)
Expand Down Expand Up @@ -239,6 +254,9 @@ If you are already specifying a `get_output_asset_key` function on your <PyObjec
Then, you can calculate the asset partitions that a particular output will correspond to by reading this output configuration in `get_output_asset_partitions`:

```python file=/concepts/assets/materialization_io_managers.py startafter=start_partitioned_asset_def endbefore=end_partitioned_asset_def
from dagster import AssetKey, IOManager, MetadataEntry


class PandasCsvIOManagerWithOutputAssetPartitions(IOManager):
def load_input(self, context):
file_path = os.path.join("my_base_dir", context.step_key, context.name)
Expand Down
50 changes: 28 additions & 22 deletions docs/content/concepts/assets/asset-observations.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,28 @@ An asset observation is an event that records metadata about a given asset. Unli
## Overview

<PyObject object="AssetObservation" /> events are used record metadata in Dagster
about a given asset. Asset observation events can be yielded at runtime within ops
about a given asset. Asset observation events can be logged at runtime within ops
and assets. An asset must be defined using the <PyObject
object="asset"
decorator
/> decorator or have existing materializations in order for its observations to be
displayed.

## Yielding an AssetObservation from an Op
## Logging an AssetObservation from an Op

To make Dagster aware that we have recorded metadata about an asset, we can yield an <PyObject object="AssetObservation" /> event:
To make Dagster aware that we have recorded metadata about an asset, we can log an <PyObject object="AssetObservation" /> event from within an op. To do this, we use the method <PyObject object="OpExecutionContext" method="log_event" /> on the context:

```python file=/concepts/assets/observations.py startafter=start_observation_asset_marker_0 endbefore=end_observation_asset_marker_0
from dagster import AssetObservation
from dagster import AssetObservation, op


@op
def observation_op():
def observation_op(context):
df = read_df()
yield AssetObservation(asset_key="observation_asset", metadata={"num_rows": len(df)})
yield Output(5)
context.log_event(
AssetObservation(asset_key="observation_asset", metadata={"num_rows": len(df)})
)
return 5
```

We should now see an observation event in the event log when we execute this asset.
Expand All @@ -53,24 +55,26 @@ height={917}
There are a variety of types of metadata that can be associated with an observation event, all through the <PyObject object="MetadataEntry" /> class. Each observation event optionally takes a dictionary of metadata entries that are then displayed in the event log and the [Asset Details](/concepts/dagit/dagit#asset-details) page. Check our API docs for <PyObject object="MetadataEntry" /> for more details on the types of event metadata available.

```python file=concepts/assets/observations.py startafter=start_observation_asset_marker_2 endbefore=end_observation_asset_marker_2
from dagster import op, AssetObservation, Output, EventMetadata
from dagster import op, AssetObservation, EventMetadata


@op
def observes_dataset_op():
def observes_dataset_op(context):
df = read_df()
remote_storage_path = persist_to_storage(df)
yield AssetObservation(
asset_key="my_dataset",
metadata={
"text_metadata": "Text-based metadata for this event",
"path": EventMetadata.path(remote_storage_path),
"dashboard_url": EventMetadata.url("http://mycoolsite.com/url_for_my_data"),
"size (bytes)": calculate_bytes(df),
},
context.log_event(
AssetObservation(
asset_key="my_dataset",
metadata={
"text_metadata": "Text-based metadata for this event",
"path": EventMetadata.path(remote_storage_path),
"dashboard_url": EventMetadata.url("http://mycoolsite.com/url_for_my_data"),
"size (bytes)": calculate_bytes(df),
},
)
)
yield AssetMaterialization(asset_key="my_dataset")
yield Output(remote_storage_path)
context.log_event(AssetMaterialization(asset_key="my_dataset"))
return remote_storage_path
```

In the [Asset Details](/concepts/dagit/dagit#asset-details) page, we can see observations in the Asset Activity table.
Expand All @@ -87,13 +91,15 @@ height={1146}
If you are observing a single slice of an asset (e.g. a single day's worth of data on a larger table), rather than mutating or creating it entirely, you can indicate this to Dagster by including the `partition` argument on the object.

```python file=/concepts/assets/observations.py startafter=start_partitioned_asset_observation endbefore=end_partitioned_asset_observation
from dagster import op, AssetMaterialization, Output
from dagster import op, AssetMaterialization


@op(config_schema={"date": str})
def partitioned_dataset_op(context):
partition_date = context.op_config["date"]
df = read_df_for_date(partition_date)
yield AssetObservation(asset_key="my_partitioned_dataset", partition=partition_date)
yield Output(df)
context.log_event(
AssetObservation(asset_key="my_partitioned_dataset", partition=partition_date)
)
return df
```

1 comment on commit a360bbd

@vercel
Copy link

@vercel vercel bot commented on a360bbd Feb 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.