Skip to content

Commit

Permalink
monitored_asset_selection param for multi-asset sensor
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Jan 18, 2023
1 parent 1c12e96 commit fdabcde
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 161 deletions.
Expand Up @@ -99,7 +99,7 @@ In the body of the sensor, you have access to the materialization event records

```python file=/concepts/partitions_schedules_sensors/sensors/asset_sensors.py startafter=start_multi_asset_sensor_marker endbefore=end_multi_asset_sensor_marker
@multi_asset_sensor(
asset_keys=[AssetKey("asset_a"), AssetKey("asset_b")],
monitored_assets=[AssetKey("asset_a"), AssetKey("asset_b")],
job=my_job,
)
def asset_a_and_b_sensor(context):
Expand All @@ -115,7 +115,7 @@ You can also return a <PyObject object="SkipReason" /> to document why the senso

```python file=/concepts/partitions_schedules_sensors/sensors/asset_sensors.py startafter=start_multi_asset_sensor_w_skip_reason endbefore=end_multi_asset_sensor_w_skip_reason
@multi_asset_sensor(
asset_keys=[AssetKey("asset_a"), AssetKey("asset_b")],
monitored_assets=[AssetKey("asset_a"), AssetKey("asset_b")],
job=my_job,
)
def asset_a_and_b_sensor_with_skip_reason(context):
Expand Down Expand Up @@ -156,7 +156,10 @@ The example below monitors two assets with the same daily partitions definition.

```python file=/concepts/partitions_schedules_sensors/sensors/asset_sensors.py startafter=start_multi_asset_sensor_AND endbefore=end_multi_asset_sensor_AND
@multi_asset_sensor(
asset_keys=[AssetKey("upstream_daily_1"), AssetKey("upstream_daily_2")],
monitored_assets=[
AssetKey("upstream_daily_1"),
AssetKey("upstream_daily_2"),
],
job=downstream_daily_job,
)
def trigger_daily_asset_if_both_upstream_partitions_materialized(context):
Expand Down Expand Up @@ -189,7 +192,10 @@ The following example monitors two upstream daily-partitioned assets, kicking of

```python file=/concepts/partitions_schedules_sensors/sensors/asset_sensors.py startafter=start_multi_asset_sensor_OR endbefore=end_multi_asset_sensor_OR
@multi_asset_sensor(
asset_keys=[AssetKey("upstream_daily_1"), AssetKey("upstream_daily_2")],
monitored_assets=[
AssetKey("upstream_daily_1"),
AssetKey("upstream_daily_2"),
],
job=downstream_daily_job,
)
def trigger_daily_asset_when_any_upstream_partitions_have_new_materializations(context):
Expand Down Expand Up @@ -308,7 +314,9 @@ If the <PyObject object="PartitionsDefinition"/> of the monitored assets differs
If a partition mapping is not defined, Dagster will use the default partition mapping, which is the <PyObject object="TimeWindowPartitionMapping"/> for time window partitions definitions and the <PyObject object="IdentityPartitionMapping"/> for other partitions definitions. The <PyObject object="TimeWindowPartitionMapping"/> will map an upstream partition to the downstream partitions that overlap with it.

```python file=/concepts/partitions_schedules_sensors/sensors/asset_sensors.py startafter=start_daily_asset_to_weekly_asset endbefore=end_daily_asset_to_weekly_asset
@multi_asset_sensor(asset_keys=[AssetKey("upstream_daily_1")], job=weekly_asset_job)
@multi_asset_sensor(
monitored_assets=[AssetKey("upstream_daily_1")], job=weekly_asset_job
)
def trigger_weekly_asset_from_daily_asset(context):
run_requests_by_partition = {}
materializations_by_partition = context.latest_materialization_records_by_partition(
Expand Down
Expand Up @@ -68,7 +68,7 @@ def my_freshness_alerting_sensor(context: FreshnessPolicySensorContext):


@multi_asset_sensor(
asset_keys=[AssetKey("asset_a"), AssetKey("asset_b")],
monitored_assets=[AssetKey("asset_a"), AssetKey("asset_b")],
job=my_job,
)
def asset_a_and_b_sensor(context):
Expand All @@ -84,7 +84,7 @@ def asset_a_and_b_sensor(context):


@multi_asset_sensor(
asset_keys=[AssetKey("asset_a"), AssetKey("asset_b")],
monitored_assets=[AssetKey("asset_a"), AssetKey("asset_b")],
job=my_job,
)
def asset_a_and_b_sensor_with_skip_reason(context):
Expand Down Expand Up @@ -149,7 +149,9 @@ def downstream_weekly_asset():
# start_daily_asset_to_weekly_asset


@multi_asset_sensor(asset_keys=[AssetKey("upstream_daily_1")], job=weekly_asset_job)
@multi_asset_sensor(
monitored_assets=[AssetKey("upstream_daily_1")], job=weekly_asset_job
)
def trigger_weekly_asset_from_daily_asset(context):
run_requests_by_partition = {}
materializations_by_partition = context.latest_materialization_records_by_partition(
Expand Down Expand Up @@ -188,7 +190,10 @@ def trigger_weekly_asset_from_daily_asset(context):


@multi_asset_sensor(
asset_keys=[AssetKey("upstream_daily_1"), AssetKey("upstream_daily_2")],
monitored_assets=[
AssetKey("upstream_daily_1"),
AssetKey("upstream_daily_2"),
],
job=downstream_daily_job,
)
def trigger_daily_asset_if_both_upstream_partitions_materialized(context):
Expand All @@ -211,7 +216,10 @@ def trigger_daily_asset_if_both_upstream_partitions_materialized(context):

# start_multi_asset_sensor_OR
@multi_asset_sensor(
asset_keys=[AssetKey("upstream_daily_1"), AssetKey("upstream_daily_2")],
monitored_assets=[
AssetKey("upstream_daily_1"),
AssetKey("upstream_daily_2"),
],
job=downstream_daily_job,
)
def trigger_daily_asset_when_any_upstream_partitions_have_new_materializations(context):
Expand Down
Expand Up @@ -41,7 +41,7 @@ def my_repo():
instance = DagsterInstance.ephemeral()
materialize([asset_a, asset_b], instance=instance)
ctx = build_multi_asset_sensor_context(
asset_keys=[AssetKey("asset_a"), AssetKey("asset_b")],
monitored_assets=[AssetKey("asset_a"), AssetKey("asset_b")],
instance=instance,
repository_def=my_repo,
)
Expand All @@ -51,7 +51,7 @@ def my_repo():
materialize([asset_c], instance=instance)

ctx = build_multi_asset_sensor_context(
asset_keys=[AssetKey("asset_c")],
monitored_assets=[AssetKey("asset_c")],
instance=instance,
repository_def=my_repo,
)
Expand All @@ -78,7 +78,10 @@ def test_multi_asset_sensor_AND():
partition_key="2022-08-01",
)
and_ctx = build_multi_asset_sensor_context(
asset_keys=[AssetKey("upstream_daily_1"), AssetKey("upstream_daily_2")],
monitored_assets=[
AssetKey("upstream_daily_1"),
AssetKey("upstream_daily_2"),
],
instance=instance,
repository_def=my_repo,
)
Expand Down Expand Up @@ -116,7 +119,10 @@ def test_multi_asset_sensor_OR():
partition_key="2022-08-01",
)
or_ctx = build_multi_asset_sensor_context(
asset_keys=[AssetKey("upstream_daily_1"), AssetKey("upstream_daily_2")],
monitored_assets=[
AssetKey("upstream_daily_1"),
AssetKey("upstream_daily_2"),
],
instance=instance,
repository_def=my_repo,
)
Expand Down Expand Up @@ -151,7 +157,7 @@ def test_multi_asset_sensor_weekly_from_daily():
]:
materialize([upstream_daily_1], instance=instance, partition_key=date)
ctx = build_multi_asset_sensor_context(
asset_keys=[AssetKey("upstream_daily_1")],
monitored_assets=[AssetKey("upstream_daily_1")],
instance=instance,
repository_def=my_repo,
)
Expand Down
Expand Up @@ -56,7 +56,7 @@ def log_asset_sensor_job():


@multi_asset_sensor(
asset_keys=[AssetKey("asset_a"), AssetKey("asset_b")],
monitored_assets=[AssetKey("asset_a"), AssetKey("asset_b")],
job=log_asset_sensor_job,
)
def asset_a_and_b_sensor(context):
Expand All @@ -76,7 +76,7 @@ def asset_a_and_b_sensor(context):


@multi_asset_sensor(
asset_keys=[AssetKey("asset_c"), AssetKey("asset_d")],
monitored_assets=[AssetKey("asset_c"), AssetKey("asset_d")],
job=log_asset_sensor_job,
)
def asset_c_or_d_sensor(context):
Expand All @@ -96,7 +96,7 @@ def asset_c_or_d_sensor(context):


@multi_asset_sensor(
asset_keys=[AssetKey("my_string_asset"), AssetKey("my_int_asset")],
monitored_assets=[AssetKey("my_string_asset"), AssetKey("my_int_asset")],
job=log_asset_sensor_job,
)
def asset_string_and_int_sensor(context):
Expand All @@ -114,7 +114,7 @@ def asset_string_and_int_sensor(context):


@multi_asset_sensor(
asset_keys=[AssetKey("asset_a")],
monitored_assets=[AssetKey("asset_a")],
job=log_asset_sensor_job,
)
def every_fifth_materialization_sensor(context):
Expand Down
@@ -1,6 +1,7 @@
import collections.abc
import inspect
from functools import update_wrapper
from typing import Callable, Optional, Sequence
from typing import Callable, Optional, Sequence, Union

import dagster._check as check
from dagster._annotations import experimental
Expand Down Expand Up @@ -187,8 +188,7 @@ def _wrapped_fn(context, event):

@experimental
def multi_asset_sensor(
asset_keys: Optional[Sequence[AssetKey]] = None,
asset_selection: Optional[AssetSelection] = None,
monitored_assets: Union[Sequence[AssetKey], AssetSelection],
*,
job_name: Optional[str] = None,
name: Optional[str] = None,
Expand All @@ -208,17 +208,14 @@ def multi_asset_sensor(
2. Return a list of `RunRequest` objects.
3. Return a `SkipReason` object, providing a descriptive message of why no runs were requested.
4. Return nothing (skipping without providing a reason)
5. Yield a `SkipReason` or yield one ore more `RunRequest` objects.
5. Yield a `SkipReason` or yield one or more `RunRequest` objects.
Takes a :py:class:`~dagster.MultiAssetSensorEvaluationContext`.
Args:
asset_keys (Optional[Sequence[AssetKey]]): The asset keys this sensor monitors. If not
provided, asset_selection argument must be provided. To monitor assets that aren't defined
in the repository that this sensor is part of, you must use asset_keys.
asset_selection (Optional[AssetSelection]): The asset selection this sensor monitors. If not
provided, asset_keys argument must be provided. If you use asset_selection, all assets that
are part of the selection must be in the repository that this sensor is part of.
monitored_assets (Union[Sequence[AssetKey], AssetSelection]): The assets this
sensor monitors. If an AssetSelection object is provided, it will only apply to assets
within the Definitions that this sensor is part of.
name (Optional[str]): The name of the sensor. Defaults to the name of the decorated
function.
minimum_interval_seconds (Optional[int]): The minimum number of seconds that will elapse
Expand All @@ -233,14 +230,22 @@ def multi_asset_sensor(
"""
check.opt_str_param(name, "name")

if not isinstance(monitored_assets, AssetSelection) and not (
isinstance(monitored_assets, collections.abc.Sequence)
and all(isinstance(el, AssetKey) for el in monitored_assets)
):
check.failed(
"The value passed to monitored_assets param must be either an AssetSelection"
f" or a Sequence of AssetKeys, but was a {type(monitored_assets)}"
)

def inner(fn: MultiAssetMaterializationFunction) -> MultiAssetSensorDefinition:
check.callable_param(fn, "fn")
sensor_name = name or fn.__name__

sensor_def = MultiAssetSensorDefinition(
name=sensor_name,
asset_keys=asset_keys,
asset_selection=asset_selection,
monitored_assets=monitored_assets,
job_name=job_name,
asset_materialization_fn=fn,
minimum_interval_seconds=minimum_interval_seconds,
Expand Down

0 comments on commit fdabcde

Please sign in to comment.