-
I have an asset check, and I want to perform some action in response to the result of that check. I can achieve this in dagster through the use of a sensor. from dagster import (
build_time_partition_freshness_checks,
build_sensor_for_freshness_checks,
AssetCheckSeverity,
JobDefinition,
AssetChecksDefinition
)
from typing import Dict, Optional
import json
# Represents the latest evaluation timestamp for each asset check. This allows us to avoid reacting to the same failure multiple times.
LatestEvaluationTimestampMapping = Dict[str, float]
# every day at 9am, the previous day's partition should be fresh.
# The return type here is a sequence of checks, which follows our build_x_checks model which all return
# sequences, but in practice this is a sequence with a single multi-asset-check definition (even if you pass in multiple assets).
freshness_checks: AssetChecksDefinition = build_time_partition_freshness_checks(
[my_daily_partitioned_assets_def], deadline_cron="0 9 * * *", timezone="...", severity=AssetCheckSeverity.WARN
)
# will handle the scheduling of the freshness check
freshness_sensor = build_sensor_for_freshness_checks(
freshness_checks=freshness_checks, name="..."
)
# performing some action based on freshness check results
def react_to_failed_checks_factory(check_keys: Sequence[AssetCheckKey], reaction_job: JobDefinition):
@sensor(job=reaction_job)
def react_to_asset_checks(context: SensorEvaluationContext) -> Optional[RunRequest]:
# Cursor here is a mapping from freshness check key to the timestamp of the last evaluation
# of that check that we've processed.
last_evaluation_timestamps = json.loads(context.cursor) if context.cursor else {}
# This API gives us a "summary record" for each asset check. A summary record
# contains the most recent execution record for the check.
summary_records = context.instance.event_log_storage.get_asset_check_summary_records(check_keys)
# here is where we accumulate the check keys to react to.
freshness_checks_to_react_to = []
for check_key in check_keys:
summary_record = summary_records[check_key]
last_check_execution_record = summary_record.last_check_execution_record
# If there is an execution of this check (possibly none if the check has never run),
# and it evaluated to failure, then check if we've processed this failure already.
if last_check_execution_record and last_check_execution_record.status.value == "FAILED":
last_eval_timestamp = last_evaluation_timestamps.get(check_key)
# If we haven't processed this failure yet, then we should react to it.
if last_eval_timestamp is None or last_check_execution_record.create_timestamp > last_eval_timestamp:
freshness_checks_to_react_to.append(check_key)
last_evaluation_timestamps[check_key] = last_check_execution_record.create_timestamp
# Do something to convert the freshness check keys into a run request, which will kick off
# the reaction job. This probably means conversion into config.
...
# Update the cursor
context.update_cursor(json.dumps(last_evaluation_timestamps))
return react_to_freshness_checks
my_reaction_sensor = react_to_failed_checks_factory(
check_keys=[freshness_check.check_key], reaction_job=some_reaction_job
) Common use cases here would be opening incidents in datadog, creating some sort of custom alert message, etc. |
Beta Was this translation helpful? Give feedback.
Answered by
dpeng817
May 30, 2024
Replies: 1 comment
-
answer in description |
Beta Was this translation helpful? Give feedback.
0 replies
Answer selected by
dpeng817
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
answer in description