Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Definitions.merge and don't immediately validate definitions #21746

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 169 additions & 18 deletions python_modules/dagster/dagster/_core/definitions/definitions_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,15 @@ class Definitions:
Any other object is coerced to a :py:class:`ResourceDefinition`.
"""

_assets: Iterable[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]
_schedules: Iterable[Union[ScheduleDefinition, UnresolvedPartitionedAssetScheduleDefinition]]
_sensors: Iterable[SensorDefinition]
_jobs: Iterable[Union[JobDefinition, UnresolvedAssetJobDefinition]]
_resources: Mapping[str, Any]
_executor: Optional[Union[ExecutorDefinition, Executor]]
_loggers: Mapping[str, LoggerDefinition]
_asset_checks: Iterable[AssetChecksDefinition]

def __init__(
self,
assets: Optional[
Expand All @@ -430,17 +439,62 @@ def __init__(
loggers: Optional[Mapping[str, LoggerDefinition]] = None,
asset_checks: Optional[Iterable[AssetChecksDefinition]] = None,
):
self._created_pending_or_normal_repo = _create_repository_using_definitions_args(
name=SINGLETON_REPOSITORY_NAME,
assets=assets,
schedules=schedules,
sensors=sensors,
jobs=jobs,
resources=resources,
executor=executor,
loggers=loggers,
asset_checks=asset_checks,
self._assets = check.opt_iterable_param(
assets,
"assets",
(AssetsDefinition, SourceAsset, CacheableAssetsDefinition),
)
self._schedules = check.opt_iterable_param(
schedules,
"schedules",
(ScheduleDefinition, UnresolvedPartitionedAssetScheduleDefinition),
)
self._sensors = check.opt_iterable_param(sensors, "sensors", SensorDefinition)
self._jobs = check.opt_iterable_param(
jobs, "jobs", (JobDefinition, UnresolvedAssetJobDefinition)
)
self._asset_checks = check.opt_iterable_param(
asset_checks, "asset_checks", AssetChecksDefinition
)
self._resources = check.opt_mapping_param(resources, "resources", key_type=str)
self._executor = check.opt_inst_param(executor, "executor", (ExecutorDefinition, Executor))
self._loggers = check.opt_mapping_param(
loggers, "loggers", key_type=str, value_type=LoggerDefinition
)

@property
def assets(self) -> Iterable[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]:
return self._assets

@property
def schedules(
self,
) -> Iterable[Union[ScheduleDefinition, UnresolvedPartitionedAssetScheduleDefinition]]:
return self._schedules

@property
def sensors(self) -> Iterable[SensorDefinition]:
return self._sensors

@property
def jobs(self) -> Iterable[Union[JobDefinition, UnresolvedAssetJobDefinition]]:
return self._jobs

@property
def resources(self) -> Mapping[str, Any]:
return self._resources

@property
def executor(self) -> Optional[Union[ExecutorDefinition, Executor]]:
return self._executor

@property
def loggers(self) -> Mapping[str, LoggerDefinition]:
return self._loggers

@property
def asset_checks(self) -> Iterable[AssetChecksDefinition]:
return self._asset_checks

@public
def get_job_def(self, name: str) -> JobDefinition:
Expand Down Expand Up @@ -555,21 +609,118 @@ def get_repository_def(self) -> RepositoryDefinition:
in order to access an functionality which is not exposed on Definitions. This method
also resolves a PendingRepositoryDefinition to a RepositoryDefinition.
"""
inner_repository = self.get_inner_repository()
return (
self._created_pending_or_normal_repo.compute_repository_definition()
if isinstance(self._created_pending_or_normal_repo, PendingRepositoryDefinition)
else self._created_pending_or_normal_repo
inner_repository.compute_repository_definition()
if isinstance(inner_repository, PendingRepositoryDefinition)
else inner_repository
)

def get_inner_repository_for_loading_process(
@cached_method
def get_inner_repository(
self,
) -> Union[RepositoryDefinition, PendingRepositoryDefinition]:
"""This method is used internally to access the inner repository during the loading process
at CLI entry points. We explicitly do not want to resolve the pending repo because the entire
point is to defer that resolution until later.
"""This method is used internally to access the inner repository. We explicitly do not want
to resolve the pending repo because the entire point is to defer that resolution until
later.
"""
return self._created_pending_or_normal_repo
return _create_repository_using_definitions_args(
name=SINGLETON_REPOSITORY_NAME,
assets=self._assets,
schedules=self._schedules,
sensors=self._sensors,
jobs=self._jobs,
resources=self._resources,
executor=self._executor,
loggers=self._loggers,
asset_checks=self._asset_checks,
)

def get_asset_graph(self) -> AssetGraph:
"""Get the AssetGraph for this set of definitions."""
return self.get_repository_def().asset_graph

@public
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this PR proposes making this method public and non-experimental. This is to avoid a regression in users' ability to validate their definitions.

So somewhat high stakes.

def validate_loadable(self) -> "Definitions":
"""Validates that the enclosed definitions will be loadable by Dagster:
- No assets have conflicting keys.
- No jobs, sensors, or schedules have conflicting names.
- All asset jobs can be resolved.
- All resource requirements are satisfied.

Raises an error if any of the above are not true.

Returns:
Definitions: The definitions object, unmodified.
"""
self.get_inner_repository()
return self
Comment on lines +644 to +657
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely not a fan of this pattern where a user is just magically expected to call this and it is unencoded in the type system. I'd prefer a top level utility function instead of a method. The existence of this sort of validate method implies that a user is supposed to call before using it or something, but that it is totally unenforced here.

A top-level staticmethod would be preferable in my view.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A top-level staticmethod would be preferable in my view.

Sounds good - I'll give that a whirl.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw I'm imagining the main use case for this method is in unit tests. I.e. I'm not expecting that everyone would call it in their definitions.py.


@staticmethod
def merge(*def_sets: "Definitions") -> "Definitions":
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I punted on thinking hard about whether this should be * args or a list. I don't have a strong opinion here.

"""Merges multiple Definitions objects into a single Definitions object.

The returned Definitions object has the union of all the definitions in the input
Definitions objects.

Returns:
Definitions: The merged definitions.
"""
check.sequence_param(def_sets, "def_sets", of_type=Definitions)

assets = []
schedules = []
sensors = []
jobs = []
asset_checks = []

resources = {}
resource_key_indexes: Dict[str, int] = {}
loggers = {}
logger_key_indexes: Dict[str, int] = {}
executor = None
executor_index: Optional[int] = None

for i, def_set in enumerate(def_sets):
assets.extend(def_set.assets or [])
asset_checks.extend(def_set.asset_checks or [])
schedules.extend(def_set.schedules or [])
sensors.extend(def_set.sensors or [])
jobs.extend(def_set.jobs or [])

for resource_key, resource_value in (def_set.resources or {}).items():
if resource_key in resources:
raise DagsterInvariantViolationError(
f"Definitions objects {resource_key_indexes[resource_key]} and {i} both define a "
f"resource with key '{resource_key}'"
)
resources[resource_key] = resource_value
resource_key_indexes[resource_key] = i

for logger_key, logger_value in (def_set.loggers or {}).items():
if logger_key in loggers:
raise DagsterInvariantViolationError(
f"Definitions objects {logger_key_indexes[logger_key]} and {i} both define a "
f"logger with key '{logger_key}'"
)
loggers[logger_key] = logger_value
logger_key_indexes[logger_key] = i

if def_set.executor is not None:
if executor is not None and executor != def_set.executor:
raise DagsterInvariantViolationError(
f"Definitions objects {executor_index} and {i} both include an executor"
)

executor = def_set.executor

return Definitions(
assets=assets,
schedules=schedules,
sensors=sensors,
jobs=jobs,
resources=resources,
executor=executor,
loggers=loggers,
asset_checks=asset_checks,
)
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ def repository_def_from_target_def(

if isinstance(target, Definitions):
# reassign to handle both repository and pending repo case
target = target.get_inner_repository_for_loading_process()
target = target.get_inner_repository()

# special case - we can wrap a single job in a repository
if isinstance(target, (JobDefinition, GraphDefinition)):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def asset2(asst1): ...
rf"\n\t{re.escape(asset1.key.to_string())}"
),
):
Definitions(assets=[asset1, asset2])
Definitions(assets=[asset1, asset2]).validate_loadable()


def test_typo_upstream_asset_no_similar() -> None:
Expand All @@ -59,7 +59,7 @@ def asset2(not_close_to_asset1): ...
r" ops and is not one of the provided sources."
),
):
Definitions(assets=[asset1, asset2])
Definitions(assets=[asset1, asset2]).validate_loadable()


def test_typo_upstream_asset_many_similar() -> None:
Expand All @@ -85,7 +85,7 @@ def asset2(asst1): ...
rf" {re.escape(asst.key.to_string())}"
),
):
Definitions(assets=[asst, asset1, assets1, asset2])
Definitions(assets=[asst, asset1, assets1, asset2]).validate_loadable()


def test_typo_upstream_asset_wrong_prefix() -> None:
Expand All @@ -103,7 +103,7 @@ def asset2(asset1): ...
rf"\n\t{re.escape(asset1.key.to_string())}"
),
):
Definitions(assets=[asset1, asset2])
Definitions(assets=[asset1, asset2]).validate_loadable()


def test_typo_upstream_asset_wrong_prefix_and_wrong_key() -> None:
Expand All @@ -122,7 +122,7 @@ def asset2(asset1): ...
r" not one of the provided sources."
),
):
Definitions(assets=[asset1, asset2])
Definitions(assets=[asset1, asset2]).validate_loadable()


def test_one_off_component_prefix() -> None:
Expand All @@ -141,7 +141,7 @@ def asset2(asset1): ...
rf"\n\t{re.escape(asset1.key.to_string())}"
),
):
Definitions(assets=[asset1, asset2])
Definitions(assets=[asset1, asset2]).validate_loadable()

# One fewer component in the prefix
@asset(ins={"asset1": AssetIn(key=AssetKey(["my", "asset1"]))})
Expand All @@ -155,7 +155,7 @@ def asset3(asset1): ...
rf"\n\t{re.escape(asset1.key.to_string())}"
),
):
Definitions(assets=[asset1, asset3])
Definitions(assets=[asset1, asset3]).validate_loadable()


def test_accidentally_using_slashes() -> None:
Expand All @@ -174,7 +174,7 @@ def asset2(asset1): ...
rf"\n\t{re.escape(asset1.key.to_string())}"
),
):
Definitions(assets=[asset1, asset2])
Definitions(assets=[asset1, asset2]).validate_loadable()


NUM_ASSETS_TO_TEST_PERF = 5000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,4 @@ def foo(bar: Bar) -> DataVersion:
assets=[foo],
jobs=[define_asset_job("source_asset_job", [foo])],
resources=resource_defs,
)
).validate_loadable()
Loading