From 92e81a95108d0da75dae16109d62366bba3f2f38 Mon Sep 17 00:00:00 2001 From: alangenfeld Date: Tue, 7 May 2019 13:53:44 -0700 Subject: [PATCH] [dagster] CompositeSolid (3/N) - config Summary: Support configuration for CompositeSolids. The current approach is the most straightforward one, nesting the config in the same way the solids are nested. ``` solids: composite_name: solids: solid_name: inputs: config: ``` We may want to add a way to use solid handles in the top level to flatten out things from getting too crazy nested, but I think we'll probably always want to support this approach as well? Interested to hear what thoughts others have. depends on D49 Test Plan: new unit tests Reviewers: max, natekupp, schrockn Reviewed By: schrockn Differential Revision: https://dagster.phacility.com/D54 --- .../dagster/core/definitions/dependency.py | 8 ++ .../core/definitions/environment_configs.py | 131 +++++++++++++----- .../dagster/core/execution_plan/plan.py | 4 +- .../system_config_tests/test_system_config.py | 7 +- .../core_tests/test_composites.py | 40 ++++++ 5 files changed, 150 insertions(+), 40 deletions(-) diff --git a/python_modules/dagster/dagster/core/definitions/dependency.py b/python_modules/dagster/dagster/core/definitions/dependency.py index d487cacdfb78..e01cf6e890a7 100644 --- a/python_modules/dagster/dagster/core/definitions/dependency.py +++ b/python_modules/dagster/dagster/core/definitions/dependency.py @@ -1,6 +1,7 @@ from collections import defaultdict, namedtuple from dagster import check +from dagster.utils import camelcase from .input import InputDefinition from .output import OutputDefinition @@ -133,6 +134,13 @@ def to_string(self): # Return unique name of the solid and its lineage (omits solid definition names) return self.parent.to_string() + '.' + self.name if self.parent else self.name + def camelcase(self): + return ( + self.parent.camelcase() + '.' + camelcase(self.name) + if self.parent + else camelcase(self.name) + ) + class SolidInputHandle(namedtuple('_SolidInputHandle', 'solid input_def')): def __new__(cls, solid, input_def): diff --git a/python_modules/dagster/dagster/core/definitions/environment_configs.py b/python_modules/dagster/dagster/core/definitions/environment_configs.py index 59dbb7c49801..1a4e117f28e8 100644 --- a/python_modules/dagster/dagster/core/definitions/environment_configs.py +++ b/python_modules/dagster/dagster/core/definitions/environment_configs.py @@ -13,13 +13,13 @@ from dagster.core.types.config import ConfigType, ConfigTypeAttributes from dagster.core.types.default_applier import apply_default_values from dagster.core.types.field_utils import FieldImpl, check_opt_field_param -from dagster.utils import camelcase, single_item, merge_dicts +from dagster.utils import camelcase, merge_dicts, single_item from .context import PipelineContextDefinition -from .dependency import DependencyStructure, Solid, SolidInputHandle +from .dependency import DependencyStructure, Solid, SolidHandle, SolidInputHandle from .mode import ModeDefinition from .resource import ResourceDefinition -from .solid import ISolidDefinition +from .solid import CompositeSolidDefinition, ISolidDefinition, SolidDefinition def SystemNamedDict(name, fields, description=None): @@ -222,7 +222,9 @@ def define_environment_cls(creation_data): '{pipeline_name}.SolidsConfigDictionary'.format( pipeline_name=pipeline_name ), - creation_data, + creation_data.solids, + creation_data.dependency_structure, + creation_data.pipeline_name, ) ), 'expectations': Field( @@ -293,9 +295,11 @@ def solid_has_configurable_outputs(solid_def): return any(map(lambda out: out.runtime_type.output_schema, solid_def.output_dict.values())) -def get_inputs_field(creation_data, solid): - check.inst_param(creation_data, 'creation_data', EnvironmentClassCreationData) +def get_inputs_field(solid, handle, dependency_structure, pipeline_name): check.inst_param(solid, 'solid', Solid) + check.inst_param(handle, 'handle', SolidHandle) + check.inst_param(dependency_structure, 'dependency_structure', DependencyStructure) + check.str_param(pipeline_name, 'pipeline_name') if not solid_has_configurable_inputs(solid.definition): return None @@ -306,7 +310,7 @@ def get_inputs_field(creation_data, solid): inp_handle = SolidInputHandle(solid, inp) # If this input is not satisfied by a dependency you must # provide it via config - if not creation_data.dependency_structure.has_dep(inp_handle): + if not dependency_structure.has_dep(inp_handle): inputs_field_fields[name] = FieldImpl(inp.runtime_type.input_schema.schema_type) if not inputs_field_fields: @@ -314,18 +318,18 @@ def get_inputs_field(creation_data, solid): return Field( SystemNamedDict( - '{pipeline_name}.{solid_name}.Inputs'.format( - pipeline_name=camelcase(creation_data.pipeline_name), - solid_name=camelcase(solid.name), + '{pipeline_name}.{solid_handle}.Inputs'.format( + pipeline_name=camelcase(pipeline_name), solid_handle=handle.camelcase() ), inputs_field_fields, ) ) -def get_outputs_field(creation_data, solid): - check.inst_param(creation_data, 'creation_data', EnvironmentClassCreationData) +def get_outputs_field(solid, handle, pipeline_name): check.inst_param(solid, 'solid', Solid) + check.inst_param(handle, 'handle', SolidHandle) + check.str_param(pipeline_name, 'pipeline_name') solid_def = solid.definition @@ -340,8 +344,8 @@ def get_outputs_field(creation_data, solid): ) output_entry_dict = SystemNamedDict( - '{pipeline_name}.{solid_name}.Outputs'.format( - pipeline_name=camelcase(creation_data.pipeline_name), solid_name=camelcase(solid.name) + '{pipeline_name}.{solid_handle}.Outputs'.format( + pipeline_name=camelcase(pipeline_name), solid_handle=handle.camelcase() ), output_dict_fields, ) @@ -351,30 +355,83 @@ def get_outputs_field(creation_data, solid): def solid_has_config_entry(solid_def): check.inst_param(solid_def, 'solid_def', ISolidDefinition) + + has_solid_config = False + if isinstance(solid_def, CompositeSolidDefinition): + has_solid_config = any( + map(solid_has_config_entry, [solid.definition for solid in solid_def.solids]) + ) + elif isinstance(solid_def, SolidDefinition): + has_solid_config = solid_def.config_field + else: + check.invariant('Unexpected ISolidDefinition type {type}'.format(type=type(solid_def))) + return ( - solid_def.config_field + has_solid_config or solid_has_configurable_inputs(solid_def) or solid_has_configurable_outputs(solid_def) ) -def define_solid_dictionary_cls(name, creation_data): +def define_isolid_field(solid, handle, dependency_structure, pipeline_name): + check.inst_param(solid, 'solid', Solid) + check.inst_param(handle, 'handle', SolidHandle) + + check.str_param(pipeline_name, 'pipeline_name') + + if isinstance(solid.definition, CompositeSolidDefinition): + composite_def = solid.definition + solid_cfg = Field( + define_solid_dictionary_cls( + '{pipeline_name}.CompositeSolidsDict.{solid_handle}'.format( + pipeline_name=camelcase(pipeline_name), solid_handle=handle.camelcase() + ), + composite_def.solids, + composite_def.dependency_structure, + pipeline_name, + handle, + ) + ) + return Field( + SystemNamedDict( + '{name}CompositeSolidConfig'.format(name=str(handle)), {'solids': solid_cfg} + ) + ) + + elif isinstance(solid.definition, SolidDefinition): + solid_config_type = define_solid_config_cls( + '{pipeline_name}.SolidConfig.{solid_handle}'.format( + pipeline_name=camelcase(pipeline_name), solid_handle=handle.camelcase() + ), + solid.definition.config_field, + inputs_field=get_inputs_field(solid, handle, dependency_structure, pipeline_name), + outputs_field=get_outputs_field(solid, handle, pipeline_name), + ) + return Field(solid_config_type) + else: + check.invariant( + 'Unexpected ISolidDefinition type {type}'.format(type=type(solid.definition)) + ) + + +def define_solid_dictionary_cls( + name, solids, dependency_structure, pipeline_name, parent_handle=None +): check.str_param(name, 'name') - check.inst_param(creation_data, 'creation_data', EnvironmentClassCreationData) + check.list_param(solids, 'solids', of_type=Solid) + check.inst_param(dependency_structure, 'dependency_structure', DependencyStructure) + check.str_param(pipeline_name, 'pipeline_name') + check.opt_inst_param(parent_handle, 'parent_handle', SolidHandle) fields = {} - for solid in creation_data.solids: + for solid in solids: if solid_has_config_entry(solid.definition): - solid_config_type = define_solid_config_cls( - '{pipeline_name}.SolidConfig.{solid_name}'.format( - pipeline_name=camelcase(creation_data.pipeline_name), - solid_name=camelcase(solid.name), - ), - solid.definition.config_field, - inputs_field=get_inputs_field(creation_data, solid), - outputs_field=get_outputs_field(creation_data, solid), + fields[solid.name] = define_isolid_field( + solid, + SolidHandle(solid.name, solid.definition.name, parent_handle), + dependency_structure, + pipeline_name, ) - fields[solid.name] = Field(solid_config_type) return SystemNamedDict(name, fields) @@ -418,12 +475,16 @@ def construct_context_config(config_value): ) -def construct_solid_dictionary(solid_dict_value): - return { - key: SolidConfig( - config=value.get('config'), - inputs=value.get('inputs', {}), - outputs=value.get('outputs', []), +def construct_solid_dictionary(solid_dict_value, parent_handle=None, config_map=None): + config_map = {} if config_map is None else config_map + for name, value in solid_dict_value.items(): + key = SolidHandle(name, None, parent_handle) + config = value.get('config') + config_map[str(key)] = SolidConfig( + config=config, inputs=value.get('inputs', {}), outputs=value.get('outputs', []) ) - for key, value in solid_dict_value.items() - } + # solids implies a composite solid config + if value.get('solids'): + construct_solid_dictionary(value['solids'], key, config_map) + + return config_map diff --git a/python_modules/dagster/dagster/core/execution_plan/plan.py b/python_modules/dagster/dagster/core/execution_plan/plan.py index bbd527b8a07f..6d3ec2b9be38 100644 --- a/python_modules/dagster/dagster/core/execution_plan/plan.py +++ b/python_modules/dagster/dagster/core/execution_plan/plan.py @@ -229,9 +229,9 @@ def get_input_source_step_handle(plan_builder, solid, input_name, input_def, han check.opt_inst_param(handle, 'handle', SolidHandle) input_handle = solid.input_handle(input_name) - solid_config = plan_builder.environment_config.solids.get(solid.name) - dependency_structure = plan_builder.get_current_dependency_structure() + solid_config = plan_builder.environment_config.solids.get(str(handle)) + dependency_structure = plan_builder.get_current_dependency_structure() if solid_config and input_def.name in solid_config.inputs: step_creation_data = create_input_thunk_execution_step( plan_builder.pipeline_name, diff --git a/python_modules/dagster/dagster_tests/core_tests/system_config_tests/test_system_config.py b/python_modules/dagster/dagster_tests/core_tests/system_config_tests/test_system_config.py index 582f6aeb4303..b22808c10ae1 100644 --- a/python_modules/dagster/dagster_tests/core_tests/system_config_tests/test_system_config.py +++ b/python_modules/dagster/dagster_tests/core_tests/system_config_tests/test_system_config.py @@ -286,7 +286,7 @@ def test_solid_dictionary_type(): pipeline_def = define_test_solids_config_pipeline() solid_dict_type = define_solid_dictionary_cls( - 'foobar', create_creation_data(pipeline_def) + 'foobar', pipeline_def.solids, pipeline_def.dependency_structure, pipeline_def.name ).inst() value = construct_solid_dictionary( @@ -371,7 +371,7 @@ def test_solid_dictionary_some_no_config(): ) solid_dict_type = define_solid_dictionary_cls( - 'foobar', create_creation_data(pipeline_def) + 'foobar', pipeline_def.solids, pipeline_def.dependency_structure, pipeline_def.name ).inst() value = construct_solid_dictionary( @@ -432,8 +432,9 @@ def test_whole_environment(): def test_solid_config_error(): + pipeline_def = define_test_solids_config_pipeline() solid_dict_type = define_solid_dictionary_cls( - 'slkdfjkjdsf', create_creation_data(define_test_solids_config_pipeline()) + 'slkdfjkjdsf', pipeline_def.solids, pipeline_def.dependency_structure, pipeline_def.name ).inst() int_solid_config_type = solid_dict_type.fields['int_config_solid'].config_type diff --git a/python_modules/dagster/dagster_tests/core_tests/test_composites.py b/python_modules/dagster/dagster_tests/core_tests/test_composites.py index 207804d9f9b0..8b9cbfaf184d 100644 --- a/python_modules/dagster/dagster_tests/core_tests/test_composites.py +++ b/python_modules/dagster/dagster_tests/core_tests/test_composites.py @@ -4,6 +4,10 @@ PipelineDefinition, SolidInstance, execute_pipeline, + Field, + String, + solid, + InputDefinition, ) from dagster.core.utility_solids import ( create_root_solid, @@ -54,3 +58,39 @@ def test_composite_basic_execution(): empty_composite = CompositeSolidDefinition(name='empty', solids=[]) result = execute_pipeline(PipelineDefinition(solids=[empty_composite])) assert result.success + + +def test_composite_config(): + @solid(config_field=Field(String)) + def configured(context): + assert context.solid_config is 'yes' + + inner = CompositeSolidDefinition(name='inner', solids=[configured]) + outer = CompositeSolidDefinition(name='outer', solids=[inner]) + pipeline = PipelineDefinition(name='composites_pipeline', solids=[outer]) + result = execute_pipeline( + pipeline, + {'solids': {'outer': {'solids': {'inner': {'solids': {'configured': {'config': 'yes'}}}}}}}, + ) + assert result.success + + +def test_composite_config_input(): + @solid(inputs=[InputDefinition('one')]) + def node_a(_context, one): + assert one is 1 + + inner = CompositeSolidDefinition(name='inner', solids=[node_a]) + outer = CompositeSolidDefinition(name='outer', solids=[inner]) + pipeline = PipelineDefinition(name='composites_pipeline', solids=[outer]) + result = execute_pipeline( + pipeline, + { + 'solids': { + 'outer': { + 'solids': {'inner': {'solids': {'node_a': {'inputs': {'one': {'value': 1}}}}}} + } + } + }, + ) + assert result.success