Skip to content

Commit

Permalink
default executor on repo (#8272)
Browse files Browse the repository at this point in the history
* validate resources for assets passed directly to a repository

* Add default executor def to repo

* lint, isort

Co-authored-by: Sandy Ryza <sandy@elementl.com>
  • Loading branch information
dpeng817 and sryza committed Jun 9, 2022
1 parent b86fe4d commit f0b7346
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import dagster._check as check
from dagster.core.errors import DagsterInvalidDefinitionError

from ..executor_definition import ExecutorDefinition
from ..graph_definition import GraphDefinition
from ..partition import PartitionSetDefinition
from ..pipeline_definition import PipelineDefinition
Expand All @@ -19,9 +20,17 @@


class _Repository:
def __init__(self, name: Optional[str] = None, description: Optional[str] = None):
def __init__(
self,
name: Optional[str] = None,
description: Optional[str] = None,
default_executor_def: Optional[ExecutorDefinition] = None,
):
self.name = check.opt_str_param(name, "name")
self.description = check.opt_str_param(description, "description")
self.default_executor_def = check.opt_inst_param(
default_executor_def, "default_executor_def", ExecutorDefinition
)

def __call__(self, fn: Callable[[], Any]) -> RepositoryDefinition:
from dagster.core.asset_defs import AssetGroup, AssetsDefinition, SourceAsset
Expand Down Expand Up @@ -63,7 +72,9 @@ def __call__(self, fn: Callable[[], Any]) -> RepositoryDefinition:
"AssetsDefinition, or SourceAsset."
f"Got {bad_definitions_str}."
)
repository_data = CachingRepositoryData.from_list(repository_definitions)
repository_data = CachingRepositoryData.from_list(
repository_definitions, default_executor_def=self.default_executor_def
)

elif isinstance(repository_definitions, dict):
if not set(repository_definitions.keys()).issubset(VALID_REPOSITORY_DATA_DICT_KEYS):
Expand Down Expand Up @@ -91,7 +102,9 @@ def __call__(self, fn: Callable[[], Any]) -> RepositoryDefinition:
)

repository_def = RepositoryDefinition(
name=self.name, description=self.description, repository_data=repository_data
name=self.name,
description=self.description,
repository_data=repository_data,
)

update_wrapper(repository_def, fn)
Expand All @@ -109,7 +122,9 @@ def repository(name: Optional[str] = ..., description: Optional[str] = ...) -> _


def repository(
name: Optional[Union[str, Callable[..., Any]]] = None, description: Optional[str] = None
name: Optional[Union[str, Callable[..., Any]]] = None,
description: Optional[str] = None,
default_executor_def: Optional[ExecutorDefinition] = None,
) -> Union[RepositoryDefinition, _Repository]:
"""Create a repository from the decorated function.
Expand Down Expand Up @@ -242,4 +257,6 @@ def complex_repository():

return _Repository()(name)

return _Repository(name=name, description=description)
return _Repository(
name=name, description=description, default_executor_def=default_executor_def
)
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from dagster.utils import merge_dicts

from .events import AssetKey
from .executor_definition import ExecutorDefinition
from .graph_definition import GraphDefinition, SubselectedGraphDefinition
from .job_definition import JobDefinition
from .partition import PartitionScheduleDefinition, PartitionSetDefinition
Expand Down Expand Up @@ -631,6 +632,7 @@ def from_list(
UnresolvedAssetJobDefinition,
]
],
default_executor_def: Optional[ExecutorDefinition] = None,
) -> "CachingRepositoryData":
"""Static constructor.
Expand Down Expand Up @@ -733,7 +735,11 @@ def from_list(
raise DagsterInvalidDefinitionError(
"A repository can't have both an AssetGroup and direct asset defs"
)
combined_asset_group = AssetGroup(assets=assets_defs, source_assets=source_assets)
combined_asset_group = AssetGroup(
assets=assets_defs,
source_assets=source_assets,
executor_def=default_executor_def,
)

if combined_asset_group:
for job_def in combined_asset_group.get_base_jobs():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
define_asset_job,
fs_io_manager,
graph,
in_process_executor,
io_manager,
job,
lambda_solid,
Expand All @@ -34,6 +35,7 @@
solid,
)
from dagster._check import CheckError
from dagster.core.definitions.executor_definition import multi_or_in_process_executor
from dagster.core.definitions.partition import PartitionedConfig, StaticPartitionsDefinition
from dagster.core.errors import DagsterInvalidSubsetError

Expand Down Expand Up @@ -1236,3 +1238,29 @@ def the_repo_dupe_graph_pipeline_invalid_sensor_graph():
@repository
def the_repo_dupe_graph_pipeline_invalid_schedule_graph():
return [the_graph, the_schedule]


def test_default_executor_repo():
@repository(default_executor_def=in_process_executor)
def the_repo():
return []


def test_default_executor_assets_repo():
@graph
def doesnt_use_provided():
pass

@asset
def the_asset():
pass

@repository(default_executor_def=in_process_executor)
def the_repo():
return [doesnt_use_provided, the_asset]

# pylint: disable=comparison-with-callable
assert the_repo.get_job("__ASSET_JOB").executor_def == in_process_executor
# The default_executor_def is currently only used on the asset job. We may
# want to change this behavior in the future.
assert the_repo.get_job("doesnt_use_provided").executor_def == multi_or_in_process_executor

0 comments on commit f0b7346

Please sign in to comment.