Skip to content

Commit

Permalink
Allow jobs to be passed in lazily to repository definitions (#7382)
Browse files Browse the repository at this point in the history
Summary:
This check was crashing when a job was passed in lazily.

It will still crash when a graph is passed in lazily (the assumption that repos consist of jobs not graphs appears to be pretty deep) but this will make it so that users can pass in a lambda that returns a job. The error is a bit odd, but is the same error you would get if you passed in a job function that returned some other random object that wasn't a job.
  • Loading branch information
gibsondan committed Apr 12, 2022
1 parent d52463c commit 4e9ee2e
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import warnings
from abc import ABC, abstractmethod
from inspect import isfunction
from types import FunctionType
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -594,7 +595,7 @@ def from_dict(repository_definitions: Dict[str, Dict[str, Any]]) -> "CachingRepo

if isinstance(job, GraphDefinition):
repository_definitions["jobs"][key] = job.coerce_to_job()
elif not isinstance(job, JobDefinition):
elif not isinstance(job, JobDefinition) and not isfunction(job):
raise DagsterInvalidDefinitionError(
f"Object mapped to {key} is not an instance of JobDefinition or GraphDefinition."
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
sensor,
solid,
)
from dagster.check import CheckError
from dagster.core.definitions.partition import PartitionedConfig, StaticPartitionsDefinition


Expand Down Expand Up @@ -465,6 +466,51 @@ def jobs():
assert jobs.get_job("other_graph")


def test_lazy_jobs():
@graph
def my_graph():
pass

@repository
def jobs():
return {
"jobs": {
"my_graph": my_graph,
"my_job": lambda: my_graph.to_job(name="my_job"),
"other_job": lambda: my_graph.to_job(name="other_job"),
}
}

assert jobs.get_pipeline("my_graph")
assert jobs.get_pipeline("my_job")
assert jobs.get_pipeline("other_job")

assert jobs.has_job("my_graph")
assert jobs.get_job("my_job")
assert jobs.get_job("other_job")


def test_lazy_graph():
@graph
def my_graph():
pass

@repository
def jobs():
return {
"jobs": {
"my_graph": lambda: my_graph,
}
}

# Repository with a lazy graph can be constructed, but fails when you try to fetch it
with pytest.raises(
CheckError,
match="Invariant failed. Description: Bad constructor for job my_graph: must return JobDefinition",
):
assert jobs.get_pipeline("my_graph")


def test_list_dupe_graph():
@graph
def foo():
Expand Down

0 comments on commit 4e9ee2e

Please sign in to comment.