Skip to content

Commit

Permalink
Minimize workspace snapshot dependency from GrapheneAssetLatestInfo
Browse files Browse the repository at this point in the history
Summary:
The medium term plan here is to have GrapheneAssetNode be the field that requires definition-level data, and the imperfectly named GrapheneAssetLatestInfo to handle all the fields that require access to the

In the short-term, we still need the nodes in the case where the most recent run for that asset is in progress. But that should still be a visible improvement over fetching the nodes for every asset on every query.
  • Loading branch information
gibsondan committed May 9, 2024
1 parent 8830211 commit 870fe0b
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
from dagster._core.storage.tags import TagType, get_tag_type

from .external import ensure_valid_config, get_external_job_or_raise
from .fetch_assets import (
get_asset_nodes,
)

if TYPE_CHECKING:
from dagster._core.storage.batch_asset_record_loader import BatchAssetRecordLoader
Expand Down Expand Up @@ -161,31 +164,25 @@ def get_run_ids(

def get_assets_latest_info(
graphene_info: "ResolveInfo",
step_keys_by_asset: Mapping[AssetKey, Sequence[str]],
asset_keys: AbstractSet[AssetKey],
asset_record_loader: "BatchAssetRecordLoader",
) -> Sequence["GrapheneAssetLatestInfo"]:
from dagster_graphql.implementation.fetch_assets import get_asset_nodes_by_asset_key

from ..schema.asset_graph import GrapheneAssetLatestInfo
from ..schema.logs.events import GrapheneMaterializationEvent
from ..schema.pipelines.pipeline import GrapheneRun

instance = graphene_info.context.instance

asset_keys = list(step_keys_by_asset.keys())

if not asset_keys:
return []

asset_nodes = get_asset_nodes_by_asset_key(graphene_info, set(asset_keys))

asset_records = asset_record_loader.get_asset_records(asset_keys)
asset_records = asset_record_loader.get_asset_records(list(asset_keys))

latest_materialization_by_asset = {
asset_record.asset_entry.asset_key: (
GrapheneMaterializationEvent(event=asset_record.asset_entry.last_materialization)
if asset_record.asset_entry.last_materialization
and asset_record.asset_entry.asset_key in step_keys_by_asset
and asset_record.asset_entry.asset_key in asset_keys
else None
)
for asset_record in asset_records
Expand All @@ -198,7 +195,7 @@ def get_assets_latest_info(
asset_record.asset_entry.asset_key: (
asset_record.asset_entry.last_materialization.run_id
if asset_record.asset_entry.last_materialization
and asset_record.asset_entry.asset_key in step_keys_by_asset
and asset_record.asset_entry.asset_key in asset_keys
else None
)
for asset_record in asset_records
Expand All @@ -212,39 +209,30 @@ def get_assets_latest_info(
if asset_record.asset_entry.last_run_id
}

run_records_by_run_id = {}
in_progress_records = []
run_ids = list(set(latest_run_ids_by_asset.values())) if latest_run_ids_by_asset else []
if run_ids:
run_records = instance.get_run_records(RunsFilter(run_ids=run_ids))
for run_record in run_records:
if run_record.dagster_run.status in PENDING_STATUSES:
in_progress_records.append(run_record)
run_records_by_run_id[run_record.dagster_run.run_id] = run_record
run_records = instance.get_run_records(RunsFilter(run_ids=run_ids)) if run_ids else []

run_records_by_run_id = {}

for run_record in run_records:
run_records_by_run_id[run_record.dagster_run.run_id] = run_record

(
in_progress_run_ids_by_asset,
unstarted_run_ids_by_asset,
) = _get_in_progress_runs_for_assets(
graphene_info,
in_progress_records,
step_keys_by_asset,
asset_keys,
run_records_by_run_id,
latest_materialization_run_id_by_asset,
latest_run_ids_by_asset,
)

from .fetch_assets import get_unique_asset_id

return [
GrapheneAssetLatestInfo(
(
get_unique_asset_id(
asset_key,
asset_nodes[asset_key].repository_location.name,
asset_nodes[asset_key].external_repository.name,
)
if asset_nodes[asset_key]
else get_unique_asset_id(asset_key)
),
get_unique_asset_id(asset_key),
asset_key,
latest_materialization_by_asset.get(asset_key),
list(unstarted_run_ids_by_asset.get(asset_key, [])),
Expand All @@ -258,16 +246,42 @@ def get_assets_latest_info(
else None
),
)
for asset_key in step_keys_by_asset.keys()
for asset_key in asset_keys
]


def _get_in_progress_runs_for_assets(
graphene_info: "ResolveInfo",
in_progress_records: Sequence[RunRecord],
step_keys_by_asset: Mapping[AssetKey, Sequence[str]],
asset_keys: AbstractSet[AssetKey],
run_records_by_run_id: Dict[str, RunRecord],
latest_materialization_run_id_by_asset: Dict[AssetKey, Optional[str]],
latest_run_ids_by_asset: Dict[AssetKey, str],
) -> Tuple[Mapping[AssetKey, AbstractSet[str]], Mapping[AssetKey, AbstractSet[str]]]:
in_progress_run_ids = set()

for run_id, run_record in run_records_by_run_id.items():
if run_record.dagster_run.status in PENDING_STATUSES:
in_progress_run_ids.add(run_id)

# Load nodes for the asset keys whose most recent run is in progress
asset_keys_with_in_progress_runs = {
asset_key
for asset_key in asset_keys
if latest_run_ids_by_asset.get(asset_key) in in_progress_run_ids
}

asset_nodes = (
get_asset_nodes(graphene_info, asset_keys_with_in_progress_runs)
if asset_keys_with_in_progress_runs
else {}
)

# Build mapping of asset key to the step keys required to generate the asset
step_keys_by_asset = {
node.external_asset_node.asset_key: node.external_asset_node.op_names
for node in asset_nodes
}

# Build mapping of step key to the assets it generates
asset_key_by_step_key = defaultdict(set)
for asset_key, step_keys in step_keys_by_asset.items():
Expand All @@ -277,7 +291,8 @@ def _get_in_progress_runs_for_assets(
in_progress_run_ids_by_asset = defaultdict(set)
unstarted_run_ids_by_asset = defaultdict(set)

for record in in_progress_records:
for run_id in in_progress_run_ids:
record = run_records_by_run_id[run_id]
run = record.dagster_run
asset_selection = run.asset_selection
run_step_keys = graphene_info.context.instance.get_execution_plan_snapshot(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict, List, Mapping, Optional, Sequence, cast
from typing import Any, List, Mapping, Optional, Sequence, cast

import dagster._check as check
import graphene
Expand Down Expand Up @@ -1076,20 +1076,10 @@ def resolve_assetsLatestInfo(
):
asset_keys = set(AssetKey.from_graphql_input(asset_key) for asset_key in assetKeys)

results = get_asset_nodes(graphene_info, asset_keys)

# Filter down to requested asset keys
# Build mapping of asset key to the step keys required to generate the asset
step_keys_by_asset: Dict[AssetKey, Sequence[str]] = {
node.external_asset_node.asset_key: node.external_asset_node.op_names
for node in results
if node.assetKey in asset_keys
}

asset_record_loader = graphene_info.context.asset_record_loader
asset_record_loader.add_asset_keys(asset_keys)

return get_assets_latest_info(graphene_info, step_keys_by_asset, asset_record_loader)
return get_assets_latest_info(graphene_info, asset_keys, asset_record_loader)

@capture_error
def resolve_logsForRun(
Expand Down

0 comments on commit 870fe0b

Please sign in to comment.