Skip to content

Commit

Permalink
Add ability to set mapping key on op/solid invocation context (#7364)
Browse files Browse the repository at this point in the history
* Add ability to set mapping key on op/solid invocation context

* upstream_mapping_key -> mapping_key
  • Loading branch information
dpeng817 committed May 3, 2022
1 parent 1bb7112 commit 441f5ba
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ def __init__(
resources_config: Dict[str, Any],
instance: Optional[DagsterInstance],
partition_key: Optional[str],
mapping_key: Optional[str],
): # pylint: disable=super-init-not-called
from dagster.core.execution.api import ephemeral_instance_if_missing
from dagster.core.execution.context_creation_pipeline import initialize_console_manager

self._solid_config = solid_config
self._mapping_key = mapping_key

self._instance_provided = (
check.opt_inst_param(instance, "instance", DagsterInstance) is not None
Expand Down Expand Up @@ -242,6 +244,7 @@ def bind(
else None,
user_events=self._user_events,
output_metadata=self._output_metadata,
mapping_key=self._mapping_key,
)

def get_events(self) -> List[UserEvent]:
Expand Down Expand Up @@ -287,6 +290,9 @@ def get_output_metadata(
return metadata.get(mapping_key)
return metadata

def get_mapping_key(self) -> Optional[str]:
return self._mapping_key


def _validate_resource_requirements(resources: "Resources", solid_def: SolidDefinition) -> None:
"""Validate correctness of resources against required resource keys"""
Expand Down Expand Up @@ -351,6 +357,7 @@ def __init__(
alias: Optional[str],
user_events: List[UserEvent],
output_metadata: Dict[str, Any],
mapping_key: Optional[str],
):
self._solid_def = solid_def
self._solid_config = solid_config
Expand All @@ -365,6 +372,7 @@ def __init__(
self._user_events: List[UserEvent] = user_events
self._seen_outputs: Dict[str, Union[str, Set[str]]] = {}
self._output_metadata: Dict[str, Any] = output_metadata
self._mapping_key = mapping_key

@property
def solid_config(self) -> Any:
Expand Down Expand Up @@ -466,7 +474,7 @@ def for_type(self, dagster_type: DagsterType) -> TypeCheckContext:
)

def get_mapping_key(self) -> Optional[str]:
return None
return self._mapping_key

def describe_op(self):
if isinstance(self.solid_def, OpDefinition):
Expand Down Expand Up @@ -582,6 +590,7 @@ def build_op_context(
instance: Optional[DagsterInstance] = None,
config: Any = None,
partition_key: Optional[str] = None,
mapping_key: Optional[str] = None,
) -> OpExecutionContext:
"""Builds op execution context from provided parameters.
Expand All @@ -597,6 +606,7 @@ def build_op_context(
config (Optional[Any]): The op config to provide to the context.
instance (Optional[DagsterInstance]): The dagster instance configured for the context.
Defaults to DagsterInstance.ephemeral().
mapping_key (Optional[str]): A key representing the mapping key from an upstream dynamic output. Can be accessed using ``context.get_mapping_key()``.
Examples:
.. code-block:: python
Expand All @@ -621,6 +631,7 @@ def build_op_context(
solid_config=op_config,
instance=instance,
partition_key=partition_key,
mapping_key=mapping_key,
)


Expand All @@ -631,6 +642,7 @@ def build_solid_context(
instance: Optional[DagsterInstance] = None,
config: Any = None,
partition_key: Optional[str] = None,
mapping_key: Optional[str] = None,
) -> UnboundSolidExecutionContext:
"""Builds solid execution context from provided parameters.
Expand Down Expand Up @@ -674,4 +686,5 @@ def build_solid_context(
solid_config=solid_config,
instance=check.opt_inst_param(instance, "instance", DagsterInstance),
partition_key=check.opt_str_param(partition_key, "partition_key"),
mapping_key=check.opt_str_param(mapping_key, "mapping_key"),
)
Original file line number Diff line number Diff line change
Expand Up @@ -1063,3 +1063,15 @@ def fake_func(foo, bar):
new_op = op(name="new_func")(new_func)

assert new_op() == 3


def test_get_mapping_key():
context = build_op_context(mapping_key="the_key")

assert context.get_mapping_key() == "the_key" # Ensure unbound context has mapping key

@op
def basic_op(context):
assert context.get_mapping_key() == "the_key" # Ensure bound context has mapping key

basic_op(context)

0 comments on commit 441f5ba

Please sign in to comment.