Skip to content

Commit

Permalink
Add a field to the now-somewhat-poorly-named RepositoryPythonOrigin t…
Browse files Browse the repository at this point in the history
…hat can add 'deployment target' information within a repo location (#6925)

Summary:
This dict then ends up on the run, which can then be accessed within the run launcher and the executor to incorporate per-location config into the resulting 'deployment target'.
  • Loading branch information
gibsondan committed Apr 13, 2022
1 parent 1600323 commit 9f1d04e
Show file tree
Hide file tree
Showing 16 changed files with 500 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,13 @@ def build_and_tag_test_image(tag):
return subprocess.check_output(["./build.sh", base_python, tag], cwd=get_test_repo_path())


def get_test_project_recon_pipeline(pipeline_name, container_image=None):
def get_test_project_recon_pipeline(pipeline_name, container_image=None, container_context=None):
return ReOriginatedReconstructablePipelineForTest(
ReconstructableRepository.for_file(
file_relative_path(__file__, "test_pipelines/repo.py"),
"define_demo_execution_repo",
container_image=container_image,
container_context=container_context,
).get_reconstructable_pipeline(pipeline_name)
)

Expand Down Expand Up @@ -154,17 +155,15 @@ def get_python_origin(self):
),
container_image=self.repository.container_image,
entry_point=DEFAULT_DAGSTER_ENTRY_POINT,
container_context=self.repository.container_context,
),
)


class ReOriginatedExternalPipelineForTest(ExternalPipeline):
def __init__(
self,
external_pipeline,
container_image=None,
):
def __init__(self, external_pipeline, container_image=None, container_context=None):
self._container_image = container_image
self._container_context = container_context
super(ReOriginatedExternalPipelineForTest, self).__init__(
external_pipeline.external_pipeline_data,
external_pipeline.repository_handle,
Expand All @@ -188,6 +187,7 @@ def get_python_origin(self):
),
container_image=self._container_image,
entry_point=DEFAULT_DAGSTER_ENTRY_POINT,
container_context=self._container_context,
),
)

Expand Down
32 changes: 26 additions & 6 deletions python_modules/dagster/dagster/core/definitions/reconstruct.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import sys
from functools import lru_cache
from typing import TYPE_CHECKING, FrozenSet, List, NamedTuple, Optional, Union, overload
from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, NamedTuple, Optional, Union, overload

from dagster import check, seven
from dagster.core.code_pointer import (
Expand All @@ -20,7 +20,7 @@
)
from dagster.core.selector import parse_solid_selection
from dagster.serdes import pack_value, unpack_value, whitelist_for_serdes
from dagster.utils import frozenlist
from dagster.utils import frozenlist, make_readonly_value
from dagster.utils.backcompat import experimental

from .pipeline_base import IPipeline
Expand All @@ -47,6 +47,7 @@ class ReconstructableRepository(
("container_image", Optional[str]),
("executable_path", Optional[str]),
("entry_point", List[str]),
("container_context", Optional[Dict[str, Any]]),
],
)
):
Expand All @@ -56,6 +57,7 @@ def __new__(
container_image=None,
executable_path=None,
entry_point=None,
container_context=None,
):
return super(ReconstructableRepository, cls).__new__(
cls,
Expand All @@ -67,6 +69,11 @@ def __new__(
if entry_point != None
else DEFAULT_DAGSTER_ENTRY_POINT
),
container_context=(
make_readonly_value(check.opt_dict_param(container_context, "container_context"))
if container_context != None
else None
),
)

@lru_cache(maxsize=1)
Expand All @@ -77,21 +84,34 @@ def get_reconstructable_pipeline(self, name):
return ReconstructablePipeline(self, name)

@classmethod
def for_file(cls, file, fn_name, working_directory=None, container_image=None):
def for_file(
cls, file, fn_name, working_directory=None, container_image=None, container_context=None
):
if not working_directory:
working_directory = os.getcwd()
return cls(FileCodePointer(file, fn_name, working_directory), container_image)
return cls(
FileCodePointer(file, fn_name, working_directory),
container_image=container_image,
container_context=container_context,
)

@classmethod
def for_module(cls, module, fn_name, working_directory=None, container_image=None):
return cls(ModuleCodePointer(module, fn_name, working_directory), container_image)
def for_module(
cls, module, fn_name, working_directory=None, container_image=None, container_context=None
):
return cls(
ModuleCodePointer(module, fn_name, working_directory),
container_image=container_image,
container_context=container_context,
)

def get_python_origin(self):
return RepositoryPythonOrigin(
executable_path=self.executable_path if self.executable_path else sys.executable,
code_pointer=self.pointer,
container_image=self.container_image,
entry_point=self.entry_point,
container_context=self.container_context,
)

def get_python_origin_id(self):
Expand Down
4 changes: 4 additions & 0 deletions python_modules/dagster/dagster/core/execution/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dagster.core.definitions import IPipeline, JobDefinition, PipelineDefinition
from dagster.core.definitions.pipeline_base import InMemoryPipeline
from dagster.core.definitions.pipeline_definition import PipelineSubsetDefinition
from dagster.core.definitions.reconstruct import ReconstructablePipeline
from dagster.core.errors import DagsterExecutionInterruptedError, DagsterInvariantViolationError
from dagster.core.events import DagsterEvent, EngineEventData
from dagster.core.execution.context.system import PlanOrchestrationContext
Expand Down Expand Up @@ -429,6 +430,9 @@ def _logged_execute_pipeline(
solid_selection=solid_selection,
solids_to_execute=solids_to_execute,
tags=tags,
pipeline_code_origin=(
pipeline.get_python_origin() if isinstance(pipeline, ReconstructablePipeline) else None
),
)

return execute_run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ def step_context_to_step_run_ref(
container_image=recon_pipeline.repository.container_image,
executable_path=recon_pipeline.repository.executable_path,
entry_point=recon_pipeline.repository.entry_point,
container_context=recon_pipeline.repository.container_context,
),
pipeline_name=recon_pipeline.pipeline_name,
solids_to_execute=recon_pipeline.solids_to_execute,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ def executable_path(self) -> Optional[str]:
def container_image(self) -> Optional[str]:
pass

@property
def container_context(self) -> Optional[Dict[str, Any]]:
return None

@property
@abstractmethod
def entry_point(self) -> Optional[List[str]]:
Expand All @@ -256,6 +260,7 @@ def get_repository_python_origin(self, repository_name: str) -> "RepositoryPytho
code_pointer=code_pointer,
container_image=self.container_image,
entry_point=self.entry_point,
container_context=self.container_context,
)


Expand Down
23 changes: 18 additions & 5 deletions python_modules/dagster/dagster/core/origin.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, NamedTuple, Optional
from typing import Any, Dict, List, NamedTuple, Optional

from dagster import check
from dagster.core.code_pointer import CodePointer
Expand All @@ -21,13 +21,11 @@ class RepositoryPythonOrigin(
("code_pointer", CodePointer),
("container_image", Optional[str]),
("entry_point", Optional[List[str]]),
("container_context", Optional[Dict[str, Any]]),
],
),
):
"""
Derived from the handle structure in the host process, this is the subset of information
necessary to load a target RepositoryDefinition in a "user process" locally.
Args:
executable_path (str): The Python executable of the user process.
code_pointer (CodePoitner): Once the process has started, an object that can be used to
Expand All @@ -36,9 +34,19 @@ class RepositoryPythonOrigin(
loads the repository. Only used in execution environments that start containers.
entry_point (Optional[List[str]]): The entry point to use when starting a new process
to load the repository. Defaults to ["dagster"] (and may differ from the executable_path).
container_context (Optional[Dict[str, Any]]): Additional context to use when creating a new
container that loads the repository. Keys can be specific to a given compute substrate
(for example, "docker", "k8s", etc.)
"""

def __new__(cls, executable_path, code_pointer, container_image=None, entry_point=None):
def __new__(
cls,
executable_path,
code_pointer,
container_image=None,
entry_point=None,
container_context=None,
):
return super(RepositoryPythonOrigin, cls).__new__(
cls,
check.str_param(executable_path, "executable_path"),
Expand All @@ -49,6 +57,11 @@ def __new__(cls, executable_path, code_pointer, container_image=None, entry_poin
if entry_point != None
else None
),
(
check.opt_dict_param(container_context, "container_context")
if container_context != None
else None
),
)

def get_id(self) -> str:
Expand Down
3 changes: 3 additions & 0 deletions python_modules/dagster/dagster/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ def __setstate__(self, state):
setdefault = __readonly__
del __readonly__

def __hash__(self):
return hash(tuple(sorted(self.items())))


class frozenlist(list):
def __readonly__(self, *args, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def recon_repository_from_origin(origin):
origin.container_image,
origin.executable_path,
origin.entry_point,
origin.container_context,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ def test_reconstruct_from_origin():
),
container_image="my_image",
entry_point=DEFAULT_DAGSTER_ENTRY_POINT,
container_context={"docker": {"registry": "my_reg"}},
),
)

Expand All @@ -167,3 +168,4 @@ def test_reconstruct_from_origin():
assert recon_pipeline.repository.pointer == origin.repository_origin.code_pointer
assert recon_pipeline.repository.container_image == origin.repository_origin.container_image
assert recon_pipeline.repository.executable_path == origin.repository_origin.executable_path
assert recon_pipeline.repository.container_context == origin.repository_origin.container_context

0 comments on commit 9f1d04e

Please sign in to comment.