Skip to content

Commit

Permalink
Fix config case for default executor (#8777)
Browse files Browse the repository at this point in the history
* Fix config case for default executor

* Fix args to resolve
  • Loading branch information
dpeng817 authored and prha committed Jul 7, 2022
1 parent 4058bb5 commit 87e291e
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 1 deletion.
Expand Up @@ -609,6 +609,7 @@ def from_dict(repository_definitions: Dict[str, Dict[str, Any]]) -> "CachingRepo
# TODO: https://github.com/dagster-io/dagster/issues/8263
assets=[],
source_assets=[],
executor_def=None,
)
elif not isinstance(job, JobDefinition) and not isfunction(job):
raise DagsterInvalidDefinitionError(
Expand Down Expand Up @@ -776,6 +777,7 @@ def from_list(
resolved_job = unresolved_job_def.resolve(
assets=combined_asset_group.assets,
source_assets=combined_asset_group.source_assets,
executor_def=default_executor_def,
)
pipelines_or_jobs[name] = resolved_job

Expand Down
Expand Up @@ -13,6 +13,7 @@
from dagster.core.definitions import (
AssetSelection,
AssetsDefinition,
ExecutorDefinition,
JobDefinition,
PartitionSetDefinition,
PartitionedConfig,
Expand Down Expand Up @@ -104,7 +105,10 @@ def run_request_for_partition(
return RunRequest(run_key=run_key, run_config=run_config, tags=run_request_tags)

def resolve(
self, assets: Sequence["AssetsDefinition"], source_assets: Sequence["SourceAsset"]
self,
assets: Sequence["AssetsDefinition"],
source_assets: Sequence["SourceAsset"],
executor_def: Optional["ExecutorDefinition"] = None,
) -> "JobDefinition":
"""
Resolve this UnresolvedAssetJobDefinition into a JobDefinition.
Expand All @@ -118,6 +122,7 @@ def resolve(
tags=self.tags,
asset_selection=self.selection.resolve([*assets, *source_assets]),
partitions_def=self.partitions_def,
executor_def=executor_def,
)


Expand Down
Expand Up @@ -1416,3 +1416,21 @@ def asset2():
@repository
def assets_repo():
return [layer_2]


def test_default_executor_config():
@asset
def some_asset():
pass

@repository(default_executor_def=in_process_executor)
def the_repo():
# The config provided to the_job matches in_process_executor, but not the default executor.
return [
define_asset_job(
"the_job", config={"execution": {"config": {"retries": {"enabled": {}}}}}
),
some_asset,
]

assert the_repo.get_job("the_job").executor_def == in_process_executor

0 comments on commit 87e291e

Please sign in to comment.