Skip to content

Commit

Permalink
[asset-reconciliation] Limit the partitions that can be reconciled wi…
Browse files Browse the repository at this point in the history
…th the sensor (#11582)
  • Loading branch information
OwenKephart committed Jan 17, 2023
1 parent 627f67c commit 396a1b8
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from dagster._annotations import experimental
from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey
from dagster._core.definitions.freshness_policy import FreshnessConstraint
from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsDefinition
from dagster._core.storage.tags import PARTITION_NAME_TAG
from dagster._utils.caching_instance_queryer import CachingInstanceQueryer

Expand Down Expand Up @@ -264,6 +265,33 @@ def find_never_materialized_or_requested_root_asset_partitions(
)


def candidates_unit_within_allowable_time_window(
asset_graph: AssetGraph, candidates_unit: Iterable[AssetKeyPartitionKey]
):
"""A given time-window partition may only be materialized if its window ends within 1 day of the
latest window for that partition.
"""
representative_candidate = next(iter(candidates_unit), None)
if not representative_candidate:
return True

partitions_def = asset_graph.get_partitions_def(representative_candidate.asset_key)
partition_key = representative_candidate.partition_key
if not isinstance(partitions_def, TimeWindowPartitionsDefinition) or not partition_key:
return True

partitions_def = cast(TimeWindowPartitionsDefinition, partitions_def)

latest_partition_window = partitions_def.get_last_partition_window()
if latest_partition_window is None:
return False

candidate_partition_window = partitions_def.time_window_for_partition_key(partition_key)
time_delta = latest_partition_window.end - candidate_partition_window.end

return time_delta < datetime.timedelta(days=1)


def determine_asset_partitions_to_reconcile(
instance_queryer: CachingInstanceQueryer,
cursor: AssetReconciliationCursor,
Expand Down Expand Up @@ -321,6 +349,9 @@ def should_reconcile(
candidates_unit: Iterable[AssetKeyPartitionKey],
to_reconcile: AbstractSet[AssetKeyPartitionKey],
) -> bool:
if not candidates_unit_within_allowable_time_window(asset_graph, candidates_unit):
return False

if any(
candidate in eventual_asset_partitions_to_reconcile_for_freshness
or candidate.asset_key not in target_asset_keys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
reconcile,
)
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.time_window_partitions import HourlyPartitionsDefinition
from dagster._core.storage.tags import PARTITION_NAME_TAG
from dagster._seven.compat.pendulum import create_pendulum_time

Expand Down Expand Up @@ -321,6 +322,7 @@ def get_downstream_partitions_for_partition_range(


daily_partitions_def = DailyPartitionsDefinition("2013-01-05")
hourly_partitions_def = HourlyPartitionsDefinition("2013-01-05-00:00")
one_partition_partitions_def = StaticPartitionsDefinition(["a"])
two_partitions_partitions_def = StaticPartitionsDefinition(["a", "b"])
fanned_out_partitions_def = StaticPartitionsDefinition(["a_1", "a_2", "a_3"])
Expand Down Expand Up @@ -481,6 +483,15 @@ def get_downstream_partitions_for_partition_range(
]
one_asset_daily_partitions = [asset_def("asset1", partitions_def=daily_partitions_def)]

hourly_to_daily_partitions = [
asset_def("hourly", partitions_def=hourly_partitions_def),
asset_def(
"daily",
["hourly"],
partitions_def=daily_partitions_def,
),
]

partitioned_after_non_partitioned = [
asset_def("asset1"),
asset_def(
Expand Down Expand Up @@ -726,7 +737,6 @@ def get_downstream_partitions_for_partition_range(
unevaluated_runs=[],
current_time=create_pendulum_time(year=2013, month=1, day=7, hour=4),
expected_run_requests=[
run_request(asset_keys=["asset1"], partition_key="2013-01-05"),
run_request(asset_keys=["asset1"], partition_key="2013-01-06"),
],
),
Expand All @@ -735,9 +745,34 @@ def get_downstream_partitions_for_partition_range(
unevaluated_runs=[],
current_time=create_pendulum_time(year=2015, month=1, day=7, hour=4),
expected_run_requests=[
run_request(asset_keys=["asset1"], partition_key=partition_key)
for partition_key in daily_partitions_def.get_partition_keys(
current_time=create_pendulum_time(year=2015, month=1, day=7, hour=4)
run_request(asset_keys=["asset1"], partition_key="2015-01-06"),
],
),
"hourly_to_daily_partitions_never_materialized": AssetReconciliationScenario(
assets=hourly_to_daily_partitions,
unevaluated_runs=[],
current_time=create_pendulum_time(year=2013, month=1, day=7, hour=4),
expected_run_requests=[
run_request(asset_keys=["hourly"], partition_key=partition_key)
for partition_key in hourly_partitions_def.get_partition_keys_in_range(
PartitionKeyRange(start="2013-01-06-04:00", end="2013-01-07-03:00")
)
],
),
"hourly_to_daily_partitions_never_materialized2": AssetReconciliationScenario(
assets=hourly_to_daily_partitions,
unevaluated_runs=[
run(["hourly"], partition_key=partition_key)
for partition_key in hourly_partitions_def.get_partition_keys_in_range(
PartitionKeyRange(start="2013-01-06-00:00", end="2013-01-06-23:00")
)
],
current_time=create_pendulum_time(year=2013, month=1, day=7, hour=4),
expected_run_requests=[run_request(asset_keys=["daily"], partition_key="2013-01-06")]
+ [
run_request(asset_keys=["hourly"], partition_key=partition_key)
for partition_key in hourly_partitions_def.get_partition_keys_in_range(
PartitionKeyRange(start="2013-01-07-00:00", end="2013-01-07-03:00")
)
],
),
Expand Down Expand Up @@ -847,7 +882,7 @@ def get_downstream_partitions_for_partition_range(
assets=one_asset_self_dependency,
unevaluated_runs=[],
expected_run_requests=[run_request(asset_keys=["asset1"], partition_key="2020-01-01")],
current_time=create_pendulum_time(year=2020, month=1, day=3, hour=4),
current_time=create_pendulum_time(year=2020, month=1, day=2, hour=4),
),
"self_dependency_prior_partition_requested": AssetReconciliationScenario(
assets=one_asset_self_dependency,
Expand Down

0 comments on commit 396a1b8

Please sign in to comment.