Skip to content

Commit

Permalink
[asset-reconciliation] Perf improvements for reconciliation sensor (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Jan 18, 2023
1 parent bd7cb10 commit 85d0357
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 202 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from collections import deque
from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union, cast
from typing import TYPE_CHECKING, List, Optional, Sequence, Union, cast

import graphene
from dagster import (
Expand Down Expand Up @@ -377,33 +376,18 @@ def resolve_assetMaterializationUsedData(
if not event_records:
return []

parents_to_children: Dict[AssetKey, AssetKey] = dict()
queue = deque([[asset_key, None]])
while queue:
[current_key, child_key] = queue.popleft()
if not current_key or asset_graph.is_source(current_key):
continue
# just return the nearest 20 assets or all the immediate parents
if len(parents_to_children) > 20 and child_key != asset_key:
break
for parent_key in asset_graph.get_parents(current_key):
if parent_key not in parents_to_children:
parents_to_children[parent_key] = current_key
queue.append([parent_key, current_key])

used_data_times = data_time_queryer.get_used_data_times_for_record(
asset_graph=asset_graph,
record=event_records[0],
upstream_keys=parents_to_children.keys(),
)

return [
GrapheneMaterializationUpstreamDataVersion(
assetKey=asset_key,
downstreamAssetKey=parents_to_children[asset_key],
assetKey=used_asset_key,
downstreamAssetKey=asset_key,
timestamp=int(materialization_time.timestamp() * 1000),
)
for asset_key, materialization_time in used_data_times.items()
for used_asset_key, materialization_time in used_data_times.items()
if materialization_time
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,18 +434,10 @@ def get_freshness_constraints_by_key(
continue
has_freshness_policy = True
upstream_keys = asset_graph.get_non_source_roots(key)
latest_record = instance_queryer.get_latest_materialization_record(key)
used_data_times = (
instance_queryer.get_used_data_times_for_record(
asset_graph=asset_graph, record=latest_record, upstream_keys=upstream_keys
)
if latest_record is not None
else {upstream_key: None for upstream_key in upstream_keys}
)
constraints_by_key[key] = freshness_policy.constraints_for_time_window(
window_start=plan_window_start,
window_end=plan_window_end,
used_data_times=used_data_times,
upstream_keys=frozenset(upstream_keys),
)

# no freshness policies, so don't bother with constraints
Expand Down Expand Up @@ -482,7 +474,6 @@ def get_current_data_times_for_key(
else:
return instance_queryer.get_used_data_times_for_record(
asset_graph=asset_graph,
upstream_keys=relevant_upstream_keys,
record=latest_record,
)

Expand All @@ -496,7 +487,7 @@ def get_expected_data_times_for_key(
"""Returns the data times for the given asset key if this asset were to be executed in this
tick.
"""
expected_data_times: Dict[AssetKey, datetime.datetime] = {asset_key: current_time}
expected_data_times: Dict[AssetKey, Optional[datetime.datetime]] = {asset_key: current_time}

def _min_or_none(a, b):
if a is None or b is None:
Expand Down Expand Up @@ -641,11 +632,12 @@ def determine_asset_partitions_to_reconcile_for_freshness(
# cannot execute this asset, so if something consumes it, it should expect to
# recieve the current contents of the asset
execution_window_start = None
expected_data_times: Mapping[AssetKey, Optional[datetime.datetime]] = {}
else:
# calculate the data times you would expect after all currently-executing runs
# were to successfully complete
in_progress_data_times = instance_queryer.get_in_progress_data_times_for_key(
asset_graph, key, relevant_upstream_keys, current_time
asset_graph, key, current_time
)

# if the latest run for this asset failed, then calculate the data times you would
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import datetime
from typing import AbstractSet, Mapping, NamedTuple, Optional
from typing import AbstractSet, FrozenSet, Mapping, NamedTuple, Optional

import pendulum
from croniter import croniter
Expand Down Expand Up @@ -94,7 +94,7 @@ def constraints_for_time_window(
self,
window_start: datetime.datetime,
window_end: datetime.datetime,
used_data_times: Mapping[AssetKey, Optional[datetime.datetime]],
upstream_keys: FrozenSet[AssetKey],
) -> AbstractSet[FreshnessConstraint]:
"""For a given time window, calculate a set of FreshnessConstraints that this asset must
satisfy.
Expand All @@ -104,9 +104,7 @@ def constraints_for_time_window(
calculated for. Generally, this is the current time.
window_start (datetime): The end time of the window that constraints will be
calculated for.
used_data_times (Mapping[AssetKey, Optional[datetime]]): For each of the relevant
upstream assets, the timestamp of the data that was used to create the current
version of this asset.
upstream_keys (FrozenSet[AssetKey]): The relevant upstream keys for this policy.
"""
constraints = set()

Expand All @@ -126,24 +124,17 @@ def constraints_for_time_window(

# iterate over each schedule tick in the provided time window
evaluation_tick = next(constraint_ticks, None)
upstream_keys = frozenset(used_data_times.keys())
used_data_time = None
for dt in used_data_times.values():
if dt is not None:
used_data_time = min(used_data_time or dt, dt)
while evaluation_tick is not None and evaluation_tick < window_end:
required_data_time = evaluation_tick - self.maximum_lag_delta
required_by_time = evaluation_tick

# only add constraints if it is not currently fully satisfied
if used_data_time is None or used_data_time < required_data_time:
constraints.add(
FreshnessConstraint(
asset_keys=upstream_keys,
required_data_time=required_data_time,
required_by_time=required_by_time,
)
constraints.add(
FreshnessConstraint(
asset_keys=upstream_keys,
required_data_time=required_data_time,
required_by_time=required_by_time,
)
)

evaluation_tick = next(constraint_ticks, None)
# fallback if the user selects a very small maximum_lag_minutes value
Expand Down
18 changes: 11 additions & 7 deletions python_modules/dagster/dagster/_core/selector/subset_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,13 +252,17 @@ def fetch_sources(
A source is a node that has no upstream dependencies within the provided selection.
It can have other dependencies outside of the selection.
"""
traverser = Traverser(graph)
sources = set()
for item in within_selection:
upstream = traverser.fetch_upstream(item, depth=MAX_NUM) & within_selection
if len(upstream) == 0 or upstream == {item}:
sources.add(item)
return sources
dp = {}

def has_upstream_within_selection(node):
if node not in dp:
dp[node] = any(
parent_node in within_selection or has_upstream_within_selection(parent_node)
for parent_node in graph["upstream"].get(node, set()) - {node}
)
return dp[node]

return {node for node in within_selection if not has_upstream_within_selection(node)}


def fetch_connected_assets_definitions(
Expand Down

0 comments on commit 85d0357

Please sign in to comment.