Skip to content

Commit

Permalink
validate resources for assets passed directly to a repository (#8270)
Browse files Browse the repository at this point in the history
* validate resources for assets passed directly to a repository

* Fix lint errors

Co-authored-by: Sandy Ryza <sandy@elementl.com>
  • Loading branch information
dpeng817 and sryza committed Jun 8, 2022
1 parent d99815d commit 40d8268
Showing 1 changed file with 187 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
AssetKey,
DagsterInvalidDefinitionError,
DagsterInvariantViolationError,
IOManager,
JobDefinition,
PipelineDefinition,
ResourceDefinition,
SensorDefinition,
SolidDefinition,
SourceAsset,
Expand All @@ -20,11 +22,13 @@
define_asset_job,
fs_io_manager,
graph,
io_manager,
job,
lambda_solid,
op,
pipeline,
repository,
resource,
schedule,
sensor,
solid,
Expand Down Expand Up @@ -803,15 +807,20 @@ def my_repo():


def test_direct_assets():
foo = SourceAsset("foo")
@io_manager(required_resource_keys={"foo"})
def the_manager():
pass

@asset
foo_resource = ResourceDefinition.hardcoded_resource("foo")
foo = SourceAsset("foo", io_manager_def=the_manager, resource_defs={"foo": foo_resource})

@asset(resource_defs={"foo": foo_resource})
def asset1():
...
pass

@asset
def asset2():
...
pass

@repository
def my_repo():
Expand All @@ -822,6 +831,180 @@ def my_repo():
AssetKey(["asset1"]),
AssetKey(["asset2"]),
}
assert my_repo.get_all_jobs()[0].resource_defs["foo"] == foo_resource


def test_direct_asset_unsatified_resource():
@asset(required_resource_keys={"a"})
def asset1():
pass

with pytest.raises(
DagsterInvalidDefinitionError,
match="resource with key 'a' required by op 'asset1' was not provided.",
):

@repository
def my_repo():
return [asset1]


def test_direct_asset_unsatified_resource_transitive():
@resource(required_resource_keys={"b"})
def resource1():
pass

@asset(resource_defs={"a": resource1})
def asset1():
pass

with pytest.raises(
DagsterInvalidDefinitionError,
match="resource with key 'b' required by resource with key 'a' was not provided.",
):

@repository
def my_repo():
return [asset1]


def test_source_asset_unsatisfied_resource():
@io_manager(required_resource_keys={"foo"})
def the_manager():
pass

with pytest.raises(
DagsterInvariantViolationError,
match="Resource with key 'foo' required by resource with key 'foo__io_manager', but not provided.",
):

@repository
def the_repo():
return [SourceAsset("foo", io_manager_def=the_manager)]


def test_source_asset_unsatisfied_resource_transitive():
@io_manager(required_resource_keys={"foo"})
def the_manager():
pass

@resource(required_resource_keys={"bar"})
def foo_resource():
pass

with pytest.raises(
DagsterInvariantViolationError,
match="Resource with key 'bar' required by resource with key 'foo', but not provided.",
):

@repository
def the_repo():
return [
SourceAsset("foo", io_manager_def=the_manager, resource_defs={"foo": foo_resource})
]


def test_direct_asset_resource_conflicts():
@asset(resource_defs={"foo": ResourceDefinition.hardcoded_resource("1")})
def first():
pass

@asset(resource_defs={"foo": ResourceDefinition.hardcoded_resource("2")})
def second():
pass

with pytest.raises(
DagsterInvalidDefinitionError,
match="Conflicting versions of resource with key 'foo' were provided to different assets.",
):

@repository
def the_repo():
return [first, second]


def test_source_asset_resource_conflicts():
@asset(resource_defs={"foo": ResourceDefinition.hardcoded_resource("1")})
def the_asset():
pass

@io_manager(required_resource_keys={"foo"})
def the_manager():
pass

the_source = SourceAsset(
key=AssetKey("the_key"),
io_manager_def=the_manager,
resource_defs={"foo": ResourceDefinition.hardcoded_resource("2")},
)

with pytest.raises(
DagsterInvalidDefinitionError,
match="Conflicting versions of resource with key 'foo' were provided to different assets.",
):

@repository
def the_repo():
return [the_asset, the_source]

other_source = SourceAsset(
key=AssetKey("other_key"),
io_manager_def=the_manager,
resource_defs={"foo": ResourceDefinition.hardcoded_resource("3")},
)

with pytest.raises(
DagsterInvalidDefinitionError,
match="Conflicting versions of resource with key 'foo' were provided to different assets.",
):

@repository
def other_repo():
return [other_source, the_source]


def test_assets_different_io_manager_defs():
class MyIOManager(IOManager):
def handle_output(self, context, obj):
assert obj == 10

def load_input(self, context):
return 5

the_manager_used = []

@io_manager
def the_manager():
the_manager_used.append("yes")
return MyIOManager()

other_manager_used = []

@io_manager
def other_manager():
other_manager_used.append("yes")
return MyIOManager()

@asset(io_manager_def=the_manager)
def the_asset(the_source, other_source):
return the_source + other_source

@asset(io_manager_def=other_manager)
def other_asset(the_source, other_source):
return the_source + other_source

the_source = SourceAsset(key=AssetKey("the_source"), io_manager_def=the_manager)

other_source = SourceAsset(key=AssetKey("other_source"), io_manager_def=other_manager)

@repository
def the_repo():
return [the_asset, other_asset, the_source, other_source]

assert len(the_repo.get_all_jobs()) == 1
assert the_repo.get_all_jobs()[0].execute_in_process().success
assert len(the_manager_used) == 2
assert len(other_manager_used) == 2


def _create_graph_with_name(name):
Expand Down

0 comments on commit 40d8268

Please sign in to comment.