Skip to content

Commit

Permalink
[dagster] CompositeSolid (3/N) - config
Browse files Browse the repository at this point in the history
Summary:
Support configuration for CompositeSolids. The current approach is the most straightforward one, nesting the config in the same way the solids are nested.

```
solids:
  composite_name:
    solids:
      solid_name:
        inputs:
        config:
```

We may want to add a way to use solid handles in the top level to flatten out things from getting too crazy nested, but I think we'll probably always want to support this approach as well?

Interested to hear what thoughts others have.

depends on D49

Test Plan: new unit tests

Reviewers: max, natekupp, schrockn

Reviewed By: schrockn

Differential Revision: https://dagster.phacility.com/D54
  • Loading branch information
alangenfeld committed May 10, 2019
1 parent d4f5ad0 commit 92e81a9
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 40 deletions.
8 changes: 8 additions & 0 deletions python_modules/dagster/dagster/core/definitions/dependency.py
@@ -1,6 +1,7 @@
from collections import defaultdict, namedtuple

from dagster import check
from dagster.utils import camelcase

from .input import InputDefinition
from .output import OutputDefinition
Expand Down Expand Up @@ -133,6 +134,13 @@ def to_string(self):
# Return unique name of the solid and its lineage (omits solid definition names)
return self.parent.to_string() + '.' + self.name if self.parent else self.name

def camelcase(self):
return (
self.parent.camelcase() + '.' + camelcase(self.name)
if self.parent
else camelcase(self.name)
)


class SolidInputHandle(namedtuple('_SolidInputHandle', 'solid input_def')):
def __new__(cls, solid, input_def):
Expand Down
131 changes: 96 additions & 35 deletions python_modules/dagster/dagster/core/definitions/environment_configs.py
Expand Up @@ -13,13 +13,13 @@
from dagster.core.types.config import ConfigType, ConfigTypeAttributes
from dagster.core.types.default_applier import apply_default_values
from dagster.core.types.field_utils import FieldImpl, check_opt_field_param
from dagster.utils import camelcase, single_item, merge_dicts
from dagster.utils import camelcase, merge_dicts, single_item

from .context import PipelineContextDefinition
from .dependency import DependencyStructure, Solid, SolidInputHandle
from .dependency import DependencyStructure, Solid, SolidHandle, SolidInputHandle
from .mode import ModeDefinition
from .resource import ResourceDefinition
from .solid import ISolidDefinition
from .solid import CompositeSolidDefinition, ISolidDefinition, SolidDefinition


def SystemNamedDict(name, fields, description=None):
Expand Down Expand Up @@ -222,7 +222,9 @@ def define_environment_cls(creation_data):
'{pipeline_name}.SolidsConfigDictionary'.format(
pipeline_name=pipeline_name
),
creation_data,
creation_data.solids,
creation_data.dependency_structure,
creation_data.pipeline_name,
)
),
'expectations': Field(
Expand Down Expand Up @@ -293,9 +295,11 @@ def solid_has_configurable_outputs(solid_def):
return any(map(lambda out: out.runtime_type.output_schema, solid_def.output_dict.values()))


def get_inputs_field(creation_data, solid):
check.inst_param(creation_data, 'creation_data', EnvironmentClassCreationData)
def get_inputs_field(solid, handle, dependency_structure, pipeline_name):
check.inst_param(solid, 'solid', Solid)
check.inst_param(handle, 'handle', SolidHandle)
check.inst_param(dependency_structure, 'dependency_structure', DependencyStructure)
check.str_param(pipeline_name, 'pipeline_name')

if not solid_has_configurable_inputs(solid.definition):
return None
Expand All @@ -306,26 +310,26 @@ def get_inputs_field(creation_data, solid):
inp_handle = SolidInputHandle(solid, inp)
# If this input is not satisfied by a dependency you must
# provide it via config
if not creation_data.dependency_structure.has_dep(inp_handle):
if not dependency_structure.has_dep(inp_handle):
inputs_field_fields[name] = FieldImpl(inp.runtime_type.input_schema.schema_type)

if not inputs_field_fields:
return None

return Field(
SystemNamedDict(
'{pipeline_name}.{solid_name}.Inputs'.format(
pipeline_name=camelcase(creation_data.pipeline_name),
solid_name=camelcase(solid.name),
'{pipeline_name}.{solid_handle}.Inputs'.format(
pipeline_name=camelcase(pipeline_name), solid_handle=handle.camelcase()
),
inputs_field_fields,
)
)


def get_outputs_field(creation_data, solid):
check.inst_param(creation_data, 'creation_data', EnvironmentClassCreationData)
def get_outputs_field(solid, handle, pipeline_name):
check.inst_param(solid, 'solid', Solid)
check.inst_param(handle, 'handle', SolidHandle)
check.str_param(pipeline_name, 'pipeline_name')

solid_def = solid.definition

Expand All @@ -340,8 +344,8 @@ def get_outputs_field(creation_data, solid):
)

output_entry_dict = SystemNamedDict(
'{pipeline_name}.{solid_name}.Outputs'.format(
pipeline_name=camelcase(creation_data.pipeline_name), solid_name=camelcase(solid.name)
'{pipeline_name}.{solid_handle}.Outputs'.format(
pipeline_name=camelcase(pipeline_name), solid_handle=handle.camelcase()
),
output_dict_fields,
)
Expand All @@ -351,30 +355,83 @@ def get_outputs_field(creation_data, solid):

def solid_has_config_entry(solid_def):
check.inst_param(solid_def, 'solid_def', ISolidDefinition)

has_solid_config = False
if isinstance(solid_def, CompositeSolidDefinition):
has_solid_config = any(
map(solid_has_config_entry, [solid.definition for solid in solid_def.solids])
)
elif isinstance(solid_def, SolidDefinition):
has_solid_config = solid_def.config_field
else:
check.invariant('Unexpected ISolidDefinition type {type}'.format(type=type(solid_def)))

return (
solid_def.config_field
has_solid_config
or solid_has_configurable_inputs(solid_def)
or solid_has_configurable_outputs(solid_def)
)


def define_solid_dictionary_cls(name, creation_data):
def define_isolid_field(solid, handle, dependency_structure, pipeline_name):
check.inst_param(solid, 'solid', Solid)
check.inst_param(handle, 'handle', SolidHandle)

check.str_param(pipeline_name, 'pipeline_name')

if isinstance(solid.definition, CompositeSolidDefinition):
composite_def = solid.definition
solid_cfg = Field(
define_solid_dictionary_cls(
'{pipeline_name}.CompositeSolidsDict.{solid_handle}'.format(
pipeline_name=camelcase(pipeline_name), solid_handle=handle.camelcase()
),
composite_def.solids,
composite_def.dependency_structure,
pipeline_name,
handle,
)
)
return Field(
SystemNamedDict(
'{name}CompositeSolidConfig'.format(name=str(handle)), {'solids': solid_cfg}
)
)

elif isinstance(solid.definition, SolidDefinition):
solid_config_type = define_solid_config_cls(
'{pipeline_name}.SolidConfig.{solid_handle}'.format(
pipeline_name=camelcase(pipeline_name), solid_handle=handle.camelcase()
),
solid.definition.config_field,
inputs_field=get_inputs_field(solid, handle, dependency_structure, pipeline_name),
outputs_field=get_outputs_field(solid, handle, pipeline_name),
)
return Field(solid_config_type)
else:
check.invariant(
'Unexpected ISolidDefinition type {type}'.format(type=type(solid.definition))
)


def define_solid_dictionary_cls(
name, solids, dependency_structure, pipeline_name, parent_handle=None
):
check.str_param(name, 'name')
check.inst_param(creation_data, 'creation_data', EnvironmentClassCreationData)
check.list_param(solids, 'solids', of_type=Solid)
check.inst_param(dependency_structure, 'dependency_structure', DependencyStructure)
check.str_param(pipeline_name, 'pipeline_name')
check.opt_inst_param(parent_handle, 'parent_handle', SolidHandle)

fields = {}
for solid in creation_data.solids:
for solid in solids:
if solid_has_config_entry(solid.definition):
solid_config_type = define_solid_config_cls(
'{pipeline_name}.SolidConfig.{solid_name}'.format(
pipeline_name=camelcase(creation_data.pipeline_name),
solid_name=camelcase(solid.name),
),
solid.definition.config_field,
inputs_field=get_inputs_field(creation_data, solid),
outputs_field=get_outputs_field(creation_data, solid),
fields[solid.name] = define_isolid_field(
solid,
SolidHandle(solid.name, solid.definition.name, parent_handle),
dependency_structure,
pipeline_name,
)
fields[solid.name] = Field(solid_config_type)

return SystemNamedDict(name, fields)

Expand Down Expand Up @@ -418,12 +475,16 @@ def construct_context_config(config_value):
)


def construct_solid_dictionary(solid_dict_value):
return {
key: SolidConfig(
config=value.get('config'),
inputs=value.get('inputs', {}),
outputs=value.get('outputs', []),
def construct_solid_dictionary(solid_dict_value, parent_handle=None, config_map=None):
config_map = {} if config_map is None else config_map
for name, value in solid_dict_value.items():
key = SolidHandle(name, None, parent_handle)
config = value.get('config')
config_map[str(key)] = SolidConfig(
config=config, inputs=value.get('inputs', {}), outputs=value.get('outputs', [])
)
for key, value in solid_dict_value.items()
}
# solids implies a composite solid config
if value.get('solids'):
construct_solid_dictionary(value['solids'], key, config_map)

return config_map
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/core/execution_plan/plan.py
Expand Up @@ -229,9 +229,9 @@ def get_input_source_step_handle(plan_builder, solid, input_name, input_def, han
check.opt_inst_param(handle, 'handle', SolidHandle)

input_handle = solid.input_handle(input_name)
solid_config = plan_builder.environment_config.solids.get(solid.name)
dependency_structure = plan_builder.get_current_dependency_structure()

solid_config = plan_builder.environment_config.solids.get(str(handle))
dependency_structure = plan_builder.get_current_dependency_structure()
if solid_config and input_def.name in solid_config.inputs:
step_creation_data = create_input_thunk_execution_step(
plan_builder.pipeline_name,
Expand Down
Expand Up @@ -286,7 +286,7 @@ def test_solid_dictionary_type():
pipeline_def = define_test_solids_config_pipeline()

solid_dict_type = define_solid_dictionary_cls(
'foobar', create_creation_data(pipeline_def)
'foobar', pipeline_def.solids, pipeline_def.dependency_structure, pipeline_def.name
).inst()

value = construct_solid_dictionary(
Expand Down Expand Up @@ -371,7 +371,7 @@ def test_solid_dictionary_some_no_config():
)

solid_dict_type = define_solid_dictionary_cls(
'foobar', create_creation_data(pipeline_def)
'foobar', pipeline_def.solids, pipeline_def.dependency_structure, pipeline_def.name
).inst()

value = construct_solid_dictionary(
Expand Down Expand Up @@ -432,8 +432,9 @@ def test_whole_environment():


def test_solid_config_error():
pipeline_def = define_test_solids_config_pipeline()
solid_dict_type = define_solid_dictionary_cls(
'slkdfjkjdsf', create_creation_data(define_test_solids_config_pipeline())
'slkdfjkjdsf', pipeline_def.solids, pipeline_def.dependency_structure, pipeline_def.name
).inst()

int_solid_config_type = solid_dict_type.fields['int_config_solid'].config_type
Expand Down
40 changes: 40 additions & 0 deletions python_modules/dagster/dagster_tests/core_tests/test_composites.py
Expand Up @@ -4,6 +4,10 @@
PipelineDefinition,
SolidInstance,
execute_pipeline,
Field,
String,
solid,
InputDefinition,
)
from dagster.core.utility_solids import (
create_root_solid,
Expand Down Expand Up @@ -54,3 +58,39 @@ def test_composite_basic_execution():
empty_composite = CompositeSolidDefinition(name='empty', solids=[])
result = execute_pipeline(PipelineDefinition(solids=[empty_composite]))
assert result.success


def test_composite_config():
@solid(config_field=Field(String))
def configured(context):
assert context.solid_config is 'yes'

inner = CompositeSolidDefinition(name='inner', solids=[configured])
outer = CompositeSolidDefinition(name='outer', solids=[inner])
pipeline = PipelineDefinition(name='composites_pipeline', solids=[outer])
result = execute_pipeline(
pipeline,
{'solids': {'outer': {'solids': {'inner': {'solids': {'configured': {'config': 'yes'}}}}}}},
)
assert result.success


def test_composite_config_input():
@solid(inputs=[InputDefinition('one')])
def node_a(_context, one):
assert one is 1

inner = CompositeSolidDefinition(name='inner', solids=[node_a])
outer = CompositeSolidDefinition(name='outer', solids=[inner])
pipeline = PipelineDefinition(name='composites_pipeline', solids=[outer])
result = execute_pipeline(
pipeline,
{
'solids': {
'outer': {
'solids': {'inner': {'solids': {'node_a': {'inputs': {'one': {'value': 1}}}}}}
}
}
},
)
assert result.success

0 comments on commit 92e81a9

Please sign in to comment.