Skip to content

Commit

Permalink
[bugfix] fix projected logical version resolution for asset downstrea…
Browse files Browse the repository at this point in the history
…m of self-dep (#12443)

### Summary & Motivation

Makes the projected logical version of assets downstream of partitioned
assets "UNKNOWN". Previously being downstream of a self-dep asset caused
a bug during PLV computation.

Fixes #11600

### How I Tested These Changes

- Modified existing unit test
- Manually confirmed error reported in #11600 no longer present
  • Loading branch information
smackesey committed Feb 22, 2023
1 parent 74f70c9 commit 2fc07dc
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@
query AssetNodeQuery($pipelineSelector: PipelineSelector!, $assetKeys: [AssetKeyInput!]) {
assetNodes(pipeline: $pipelineSelector, assetKeys: $assetKeys) {
id
assetKey {
path
}
currentLogicalVersion
projectedLogicalVersion
assetMaterializations {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Any, Mapping

from dagster import (
AssetIn,
DailyPartitionsDefinition,
Expand Down Expand Up @@ -83,9 +85,13 @@ def get_repo_with_partitioned_self_dep_asset():
def a(a):
del a

@asset
def b(a):
return a

@repository
def repo():
return [a]
return [a, b]

return repo

Expand All @@ -100,7 +106,8 @@ def test_partitioned_self_dep():
result = _fetch_logical_versions(context, repo)
assert result
assert result.data
assert result.data["assetNodes"][0]["projectedLogicalVersion"] is None
assert _get_asset_node("a", result)["projectedLogicalVersion"] is None
assert _get_asset_node("b", result)["projectedLogicalVersion"] == "UNKNOWN"


def _materialize_assets(context: WorkspaceRequestContext, repo: RepositoryDefinition):
Expand All @@ -125,3 +132,7 @@ def _fetch_logical_versions(context: WorkspaceRequestContext, repo: RepositoryDe
"pipelineSelector": selector,
},
)


def _get_asset_node(key: str, result: Any) -> Mapping[str, Any]:
return next((node for node in result.data["assetNodes"] if node["assetKey"]["path"] == [key]))
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,9 @@ def _get_current_logical_version(self, *, key: AssetKey) -> LogicalVersion:

@cached_method
def _get_projected_logical_version(self, *, key: AssetKey) -> LogicalVersion:
if self.asset_graph.is_source(key):
if self.asset_graph.get_partitions_def(key):
return UNKNOWN_LOGICAL_VERSION
elif self.asset_graph.is_source(key):
event = self._instance.get_latest_logical_version_record(key, True)
if event:
version = (
Expand Down

0 comments on commit 2fc07dc

Please sign in to comment.