From 4f5b093dcd21c34bb8542ab761ed7e46dc50e352 Mon Sep 17 00:00:00 2001 From: tlento Date: Mon, 13 May 2024 17:11:42 -0700 Subject: [PATCH 1/5] Use pushdown params to disable time range constraint pushdown The predicate pushdown operations for time range constriants currently rely on None/not-None state to determine whether or not to push the ConstrainTimeRange filter node to the source. With the switch to pushdown params this None/not-None state gets tenuous, as future updates will need to do things like manage time range constraints, other time dimension filters, and categorical dimension (and entity) filters, and relying on externalizing None/not-None combinations for these various filter types will be challenging. This change encapsulates the enabling and disabling of time range constraints inside of PredicatePushdownParams. At the moment, in order to maintain the existing behavior, we simply internalize the None/not-None behavior for time range constraints. This will also allow us to easily retain pushdown processing for categorical dimensions even when time constraint filters should not be applied, and gradually centralize those controls as we streamline the callsites. There is added complexity to this change because of two things. 1. Time constraint updating is scattered around in the DataflowPlanBuilder 2. Conversion metrics currently do predicate pushdown for time constraints The first of these will be addressed later. Since we cannot reliably support general predicate pushdown for conversion metrics, we need to allow for a time-range-only pushdown operation. Today this is equivalent to pushdown enabled, but that will change shortly. --- .../dataflow/builder/dataflow_plan_builder.py | 41 +++++-- metricflow/plan_conversion/node_processor.py | 113 ++++++++++++++++-- .../builder/test_predicate_pushdown.py | 54 +++++++++ 3 files changed, 189 insertions(+), 19 deletions(-) create mode 100644 tests_metricflow/dataflow/builder/test_predicate_pushdown.py diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index 1177aa43e..405d96535 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -82,7 +82,11 @@ from metricflow.dataflow.nodes.write_to_table import WriteToResultTableNode from metricflow.dataflow.optimizer.dataflow_plan_optimizer import DataflowPlanOptimizer from metricflow.dataset.dataset_classes import DataSet -from metricflow.plan_conversion.node_processor import PredicatePushdownParameters, PreJoinNodeProcessor +from metricflow.plan_conversion.node_processor import ( + PredicatePushdownParameters, + PredicatePushdownState, + PreJoinNodeProcessor, +) from metricflow.sql.sql_table import SqlTable logger = logging.getLogger(__name__) @@ -237,6 +241,16 @@ def _build_aggregated_conversion_node( constant_properties: Optional[Sequence[ConstantPropertyInput]] = None, ) -> DataflowPlanNode: """Builds a node that contains aggregated values of conversions and opportunities.""" + # Pushdown parameters vary with conversion metrics due to the way the time joins are applied. + # Due to other outstanding issues with conversion metric fitlers, we disable predicate + # pushdown for any filter parameter set that is not part of the original time range constraint + # implementation. + disabled_pushdown_parameters = PredicatePushdownParameters.with_pushdown_disabled() + time_range_only_pushdown_parameters = PredicatePushdownParameters( + time_range_constraint=predicate_pushdown_params.time_range_constraint, + pushdown_state=PredicatePushdownState.ENABLED_FOR_TIME_RANGE_ONLY, + ) + # Build measure recipes base_required_linkable_specs, _ = self.__get_required_and_extraneous_linkable_specs( queried_linkable_specs=queried_linkable_specs, @@ -244,14 +258,13 @@ def _build_aggregated_conversion_node( ) base_measure_recipe = self._find_dataflow_recipe( measure_spec_properties=self._build_measure_spec_properties([base_measure_spec.measure_spec]), - predicate_pushdown_params=predicate_pushdown_params, + predicate_pushdown_params=time_range_only_pushdown_parameters, linkable_spec_set=base_required_linkable_specs, ) logger.info(f"Recipe for base measure aggregation:\n{mf_pformat(base_measure_recipe)}") conversion_measure_recipe = self._find_dataflow_recipe( measure_spec_properties=self._build_measure_spec_properties([conversion_measure_spec.measure_spec]), - # TODO - Pushdown: Evaluate the potential for applying time constraints and other predicates for conversion - predicate_pushdown_params=PredicatePushdownParameters(time_range_constraint=None), + predicate_pushdown_params=disabled_pushdown_parameters, linkable_spec_set=LinkableSpecSet(), ) logger.info(f"Recipe for conversion measure aggregation:\n{mf_pformat(conversion_measure_recipe)}") @@ -268,7 +281,7 @@ def _build_aggregated_conversion_node( aggregated_base_measure_node = self.build_aggregated_measure( metric_input_measure_spec=base_measure_spec, queried_linkable_specs=queried_linkable_specs, - predicate_pushdown_params=predicate_pushdown_params, + predicate_pushdown_params=time_range_only_pushdown_parameters, ) # Build unaggregated conversions source node @@ -337,10 +350,14 @@ def _build_aggregated_conversion_node( required_local_linkable_specs=base_measure_recipe.required_local_linkable_specs, join_linkable_instances_recipes=base_measure_recipe.join_linkable_instances_recipes, ) + # TODO: Refine conversion metric configuration to fit into the standard dataflow plan building model + # In this case we override the measure recipe, which currently results in us bypassing predicate pushdown + # Rather than relying on happenstance in the way the code is laid out we also explicitly disable + # predicate pushdwon until we are ready to fully support it for conversion metrics aggregated_conversions_node = self.build_aggregated_measure( metric_input_measure_spec=conversion_measure_spec, queried_linkable_specs=queried_linkable_specs, - predicate_pushdown_params=predicate_pushdown_params, + predicate_pushdown_params=disabled_pushdown_parameters, measure_recipe=recipe_with_join_conversion_source_node, ) @@ -492,11 +509,10 @@ def _build_derived_metric_output_node( if not metric_spec.has_time_offset: filter_specs.extend(metric_spec.filter_specs) - # TODO - Pushdown: use parameters to disable pushdown operations instead of clobbering the constraints metric_pushdown_params = ( predicate_pushdown_params if not metric_spec.has_time_offset - else PredicatePushdownParameters(time_range_constraint=None) + else PredicatePushdownParameters.with_pushdown_disabled() ) parent_nodes.append( @@ -867,7 +883,7 @@ def _find_dataflow_recipe( node_data_set_resolver=self._node_data_set_resolver, ) # TODO - Pushdown: Encapsulate this in the node processor - if predicate_pushdown_params.time_range_constraint: + if predicate_pushdown_params.is_pushdown_enabled and predicate_pushdown_params.time_range_constraint: candidate_nodes_for_left_side_of_join = list( node_processor.add_time_range_constraint( source_nodes=candidate_nodes_for_left_side_of_join, @@ -1298,15 +1314,16 @@ def _build_aggregated_measure_from_measure_source_node( + indent(f"\nmeasure_specs:\n{mf_pformat([measure_spec])}") + indent(f"\nevaluation:\n{mf_pformat(required_linkable_specs)}") ) - - # TODO - Pushdown: Update this to be more robust to additional pushdown parameters measure_time_constraint = ( (cumulative_metric_adjusted_time_constraint or predicate_pushdown_params.time_range_constraint) # If joining to time spine for time offset, constraints will be applied after that join. if not before_aggregation_time_spine_join_description else None ) - measure_pushdown_params = PredicatePushdownParameters(time_range_constraint=measure_time_constraint) + measure_pushdown_params = PredicatePushdownParameters.with_time_range_constraint( + predicate_pushdown_params, time_range_constraint=measure_time_constraint + ) + find_recipe_start_time = time.time() measure_recipe = self._find_dataflow_recipe( measure_spec_properties=measure_properties, diff --git a/metricflow/plan_conversion/node_processor.py b/metricflow/plan_conversion/node_processor.py index e21649e11..4fd46a774 100644 --- a/metricflow/plan_conversion/node_processor.py +++ b/metricflow/plan_conversion/node_processor.py @@ -1,9 +1,11 @@ from __future__ import annotations +import dataclasses import logging -from dataclasses import dataclass +from enum import Enum from typing import List, Optional, Sequence, Set +from dbt_semantic_interfaces.enum_extension import assert_values_exhausted from dbt_semantic_interfaces.references import EntityReference, TimeDimensionReference from metricflow_semantics.filters.time_constraint import TimeRangeConstraint from metricflow_semantics.mf_logging.pretty_print import mf_pformat @@ -28,7 +30,7 @@ logger = logging.getLogger(__name__) -@dataclass(frozen=True) +@dataclasses.dataclass(frozen=True) class MultiHopJoinCandidateLineage: """Describes how the multi-hop join candidate was formed. @@ -48,7 +50,7 @@ class MultiHopJoinCandidateLineage: join_second_node_by_entity: LinklessEntitySpec -@dataclass(frozen=True) +@dataclasses.dataclass(frozen=True) class MultiHopJoinCandidate: """A candidate node containing linkable specs that is join of other nodes. It's used to resolve multi-hop queries. @@ -59,14 +61,111 @@ class MultiHopJoinCandidate: lineage: MultiHopJoinCandidateLineage -@dataclass(frozen=True) +class PredicatePushdownState(Enum): + """Enumeration of constraint states describing when predicate pushdown may operate in a dataflow plan. + + This is necessary for holistic checks against the set of potentially enabled pushdown operations, because the + scenario where we only allow time range updates requires careful overriding of other pushdown properties. + + Note: the time_range_only state is a backwards compatibility shim for use with conversion metrics while + we determine how best to support predicate pushdown for conversion metrics. It may have longer term utility, + but ideally we'd collapse this into a single enabled/disabled boolean property. + """ + + DISABLED = "disabled" + FULLY_ENABLED = "fully_enabled" + ENABLED_FOR_TIME_RANGE_ONLY = "time_range_only" + + +@dataclasses.dataclass(frozen=True) class PredicatePushdownParameters: - """Container class for managing filter predicate pushdown. + """Container class for managing information about whether and how to do filter predicate pushdown. - Stores time constraint information for applying pre-join time filters. + The time_range_constraint property holds the time window for setting up a time range filter expression. """ - time_range_constraint: Optional[TimeRangeConstraint] + _PREDICATE_METADATA_KEY = "is_predicate_property" + + time_range_constraint: Optional[TimeRangeConstraint] = dataclasses.field(metadata={_PREDICATE_METADATA_KEY: True}) + pushdown_state: PredicatePushdownState = PredicatePushdownState.FULLY_ENABLED + + def __post_init__(self) -> None: + """Validation to ensure pushdown properties are configured correctly. + + In particular, this asserts that cases where pushdown is disabled cannot leak pushdown operations via + outside property access - if pushdown is disabled, no further pushdown operations of any kind are allowed + on that particular code branch. + """ + if self.pushdown_state is PredicatePushdownState.FULLY_ENABLED: + return + + invalid_predicate_property_names = { + field.name for field in dataclasses.fields(self) if field.metadata.get(self._PREDICATE_METADATA_KEY) + } + + if self.pushdown_state is PredicatePushdownState.ENABLED_FOR_TIME_RANGE_ONLY: + # We don't do validation for time range constraint configuration - having None set in this state is the + # equivalent of disabling predicate pushdown, but that time constraint value might be updated later, + # and so we do not block overrides to (or from) None to avoid meaningless bookkeeping at callsites. + # Also, we keep the magic string name Python uses hidden in here instead of expanding access to it. + invalid_predicate_property_names.remove("time_range_constraint") + + instance_configuration = dataclasses.asdict(self) + invalid_disabled_properties = { + property_name: value + for property_name, value in instance_configuration.items() + if property_name in invalid_predicate_property_names and value is not None + } + + assert not invalid_disabled_properties, ( + "Invalid pushdown parameter configuration! Disabled pushdown parameters cannot have properties " + "set that may lead to improper access and use in other contexts, as that can lead to unintended " + "filtering operations in cases where these properties are accessed without appropriate checks against " + "pushdown configuration. This indicates that pushdown is configured for limited scope operations, but " + f"other predicate properties are set to non-None values.\nFull configuration: {instance_configuration}\n" + f"Invalid predicate properties: {invalid_disabled_properties}" + ) + + @property + def is_pushdown_enabled(self) -> bool: + """Convenience accessor for checking that no pushdown constraints exist.""" + pushdown_state = self.pushdown_state + if pushdown_state is PredicatePushdownState.DISABLED: + return False + elif pushdown_state is PredicatePushdownState.FULLY_ENABLED: + return True + elif pushdown_state is PredicatePushdownState.ENABLED_FOR_TIME_RANGE_ONLY: + return True + else: + return assert_values_exhausted(pushdown_state) + + @staticmethod + def with_time_range_constraint( + original_pushdown_params: PredicatePushdownParameters, time_range_constraint: Optional[TimeRangeConstraint] + ) -> PredicatePushdownParameters: + """Factory method for overriding the time range constraint value in a given set of pushdown parameters. + + This allows for crude updates to the core time range constraint, including selectively disabling time range + predicate pushdown in certain sub-branches of the dataflow plan, such as in complex cases involving time spine + joins and cumulative metrics. + """ + if original_pushdown_params.is_pushdown_enabled: + return PredicatePushdownParameters( + time_range_constraint=time_range_constraint, + ) + else: + return original_pushdown_params + + @staticmethod + def with_pushdown_disabled() -> PredicatePushdownParameters: + """Factory method for disabling predicate pushdown for all parameter types. + + This is useful in cases where there is a branched path where pushdown should be disabled in one branch while the + other may remain eligible. For example, a join linkage where one side of the join contains an unsupported + configuration might send a disabled copy of the pushdown parameters down that path while retaining the potential + for using another path. + """ + return PredicatePushdownParameters(time_range_constraint=None, pushdown_state=PredicatePushdownState.DISABLED) class PreJoinNodeProcessor: diff --git a/tests_metricflow/dataflow/builder/test_predicate_pushdown.py b/tests_metricflow/dataflow/builder/test_predicate_pushdown.py new file mode 100644 index 000000000..397b61a6e --- /dev/null +++ b/tests_metricflow/dataflow/builder/test_predicate_pushdown.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +import dataclasses + +import pytest +from metricflow_semantics.filters.time_constraint import TimeRangeConstraint + +from metricflow.plan_conversion.node_processor import PredicatePushdownParameters, PredicatePushdownState + + +@pytest.fixture +def all_pushdown_params() -> PredicatePushdownParameters: + """Tests a valid configuration with all predicate properties set and pushdown fully enabled.""" + params = PredicatePushdownParameters( + time_range_constraint=TimeRangeConstraint.all_time(), pushdown_state=PredicatePushdownState.FULLY_ENABLED + ) + predicate_property_names = { + field.name for field in dataclasses.fields(params) if field.metadata.get(params._PREDICATE_METADATA_KEY) + } + predicate_properties = { + name: value for name, value in dataclasses.asdict(params).items() if name in predicate_property_names + } + assert all(value is not None for value in predicate_properties.values()), ( + "All predicate properties in this pushdown param instance should be set to something other than None. Found " + f"one or more None values in property map: {predicate_properties}" + ) + return params + + +def test_time_range_pushdown_enabled_states(all_pushdown_params: PredicatePushdownParameters) -> None: + """Tests pushdown enabled check for time range pushdown operations.""" + time_range_only_params = PredicatePushdownParameters( + time_range_constraint=TimeRangeConstraint.all_time(), + pushdown_state=PredicatePushdownState.ENABLED_FOR_TIME_RANGE_ONLY, + ) + + enabled_states = { + "fully enabled": all_pushdown_params.is_pushdown_enabled, + "enabled for time range only": time_range_only_params.is_pushdown_enabled, + } + + assert all(list(enabled_states.values())), ( + "Expected pushdown to be enabled for pushdown params with time range constraint and global pushdown enabled, " + f"but some params returned False for is_pushdown_enabled.\nPushdown enabled states: {enabled_states}\n" + f"All params: {all_pushdown_params}\nTime range only params: {time_range_only_params}" + ) + + +def test_invalid_disabled_pushdown_params() -> None: + """Tests checks for invalid param configuration on disabled pushdown parameters.""" + with pytest.raises(AssertionError, match="Disabled pushdown parameters cannot have properties set"): + PredicatePushdownParameters( + pushdown_state=PredicatePushdownState.DISABLED, time_range_constraint=TimeRangeConstraint.all_time() + ) From 266853939cc6bfbfa23ccd7c35cb4a6a41ea8afd Mon Sep 17 00:00:00 2001 From: tlento Date: Thu, 16 May 2024 17:46:49 -0700 Subject: [PATCH 2/5] Simplify predicate pushdown state management --- .../dataflow/builder/dataflow_plan_builder.py | 21 ++-- metricflow/plan_conversion/node_processor.py | 104 +++++++++--------- .../builder/test_predicate_pushdown.py | 24 +--- 3 files changed, 71 insertions(+), 78 deletions(-) diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index 405d96535..2e0ce65a5 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -84,8 +84,8 @@ from metricflow.dataset.dataset_classes import DataSet from metricflow.plan_conversion.node_processor import ( PredicatePushdownParameters, - PredicatePushdownState, PreJoinNodeProcessor, + PushdownPredicateInputType, ) from metricflow.sql.sql_table import SqlTable @@ -248,7 +248,7 @@ def _build_aggregated_conversion_node( disabled_pushdown_parameters = PredicatePushdownParameters.with_pushdown_disabled() time_range_only_pushdown_parameters = PredicatePushdownParameters( time_range_constraint=predicate_pushdown_params.time_range_constraint, - pushdown_state=PredicatePushdownState.ENABLED_FOR_TIME_RANGE_ONLY, + pushdown_enabled_types=frozenset([PushdownPredicateInputType.TIME_RANGE_CONSTRAINT]), ) # Build measure recipes @@ -883,7 +883,10 @@ def _find_dataflow_recipe( node_data_set_resolver=self._node_data_set_resolver, ) # TODO - Pushdown: Encapsulate this in the node processor - if predicate_pushdown_params.is_pushdown_enabled and predicate_pushdown_params.time_range_constraint: + if ( + predicate_pushdown_params.is_pushdown_enabled_for_time_range_constraint + and predicate_pushdown_params.time_range_constraint + ): candidate_nodes_for_left_side_of_join = list( node_processor.add_time_range_constraint( source_nodes=candidate_nodes_for_left_side_of_join, @@ -1445,12 +1448,12 @@ def _build_aggregated_measure_from_measure_source_node( if non_additive_dimension_spec is not None: # Apply semi additive join on the node agg_time_dimension = measure_properties.agg_time_dimension - queried_time_dimension_spec: Optional[ - TimeDimensionSpec - ] = self._find_non_additive_dimension_in_linkable_specs( - agg_time_dimension=agg_time_dimension, - linkable_specs=queried_linkable_specs.as_tuple, - non_additive_dimension_spec=non_additive_dimension_spec, + queried_time_dimension_spec: Optional[TimeDimensionSpec] = ( + self._find_non_additive_dimension_in_linkable_specs( + agg_time_dimension=agg_time_dimension, + linkable_specs=queried_linkable_specs.as_tuple, + non_additive_dimension_spec=non_additive_dimension_spec, + ) ) time_dimension_spec = TimeDimensionSpec.from_name(non_additive_dimension_spec.name) window_groupings = tuple( diff --git a/metricflow/plan_conversion/node_processor.py b/metricflow/plan_conversion/node_processor.py index 4fd46a774..1c5ad09fa 100644 --- a/metricflow/plan_conversion/node_processor.py +++ b/metricflow/plan_conversion/node_processor.py @@ -3,7 +3,7 @@ import dataclasses import logging from enum import Enum -from typing import List, Optional, Sequence, Set +from typing import FrozenSet, List, Optional, Sequence, Set from dbt_semantic_interfaces.enum_extension import assert_values_exhausted from dbt_semantic_interfaces.references import EntityReference, TimeDimensionReference @@ -61,7 +61,7 @@ class MultiHopJoinCandidate: lineage: MultiHopJoinCandidateLineage -class PredicatePushdownState(Enum): +class PushdownPredicateInputType(Enum): """Enumeration of constraint states describing when predicate pushdown may operate in a dataflow plan. This is necessary for holistic checks against the set of potentially enabled pushdown operations, because the @@ -72,9 +72,10 @@ class PredicatePushdownState(Enum): but ideally we'd collapse this into a single enabled/disabled boolean property. """ - DISABLED = "disabled" - FULLY_ENABLED = "fully_enabled" - ENABLED_FOR_TIME_RANGE_ONLY = "time_range_only" + CATEGORICAL_DIMENSION = "categorical_dimension" + ENTITY = "entity" + TIME_DIMENSION = "time_dimension" + TIME_RANGE_CONSTRAINT = "time_range_constraint" @dataclasses.dataclass(frozen=True) @@ -84,60 +85,58 @@ class PredicatePushdownParameters: The time_range_constraint property holds the time window for setting up a time range filter expression. """ - _PREDICATE_METADATA_KEY = "is_predicate_property" - - time_range_constraint: Optional[TimeRangeConstraint] = dataclasses.field(metadata={_PREDICATE_METADATA_KEY: True}) - pushdown_state: PredicatePushdownState = PredicatePushdownState.FULLY_ENABLED + time_range_constraint: Optional[TimeRangeConstraint] + pushdown_enabled_types: FrozenSet[PushdownPredicateInputType] = frozenset( + [PushdownPredicateInputType.TIME_RANGE_CONSTRAINT] + ) def __post_init__(self) -> None: """Validation to ensure pushdown properties are configured correctly. In particular, this asserts that cases where pushdown is disabled cannot leak pushdown operations via outside property access - if pushdown is disabled, no further pushdown operations of any kind are allowed - on that particular code branch. + on that particular code branch. It also asserts that unsupported pushdown scenarios are not configured. """ - if self.pushdown_state is PredicatePushdownState.FULLY_ENABLED: - return - - invalid_predicate_property_names = { - field.name for field in dataclasses.fields(self) if field.metadata.get(self._PREDICATE_METADATA_KEY) - } - - if self.pushdown_state is PredicatePushdownState.ENABLED_FOR_TIME_RANGE_ONLY: - # We don't do validation for time range constraint configuration - having None set in this state is the - # equivalent of disabling predicate pushdown, but that time constraint value might be updated later, - # and so we do not block overrides to (or from) None to avoid meaningless bookkeeping at callsites. - # Also, we keep the magic string name Python uses hidden in here instead of expanding access to it. - invalid_predicate_property_names.remove("time_range_constraint") - - instance_configuration = dataclasses.asdict(self) - invalid_disabled_properties = { - property_name: value - for property_name, value in instance_configuration.items() - if property_name in invalid_predicate_property_names and value is not None - } - - assert not invalid_disabled_properties, ( - "Invalid pushdown parameter configuration! Disabled pushdown parameters cannot have properties " - "set that may lead to improper access and use in other contexts, as that can lead to unintended " - "filtering operations in cases where these properties are accessed without appropriate checks against " - "pushdown configuration. This indicates that pushdown is configured for limited scope operations, but " - f"other predicate properties are set to non-None values.\nFull configuration: {instance_configuration}\n" - f"Invalid predicate properties: {invalid_disabled_properties}" + invalid_types: Set[PushdownPredicateInputType] = set() + + for input_type in self.pushdown_enabled_types: + if ( + input_type is PushdownPredicateInputType.CATEGORICAL_DIMENSION + or input_type is PushdownPredicateInputType.ENTITY + or input_type is PushdownPredicateInputType.TIME_DIMENSION + ): + invalid_types.add(input_type) + elif input_type is PushdownPredicateInputType.TIME_RANGE_CONSTRAINT: + continue + else: + assert_values_exhausted(input_type) + + assert len(invalid_types) == 0, ( + "Unsupported predicate input type found in pushdown state configuration! We currently only support " + "predicate pushdown for a subset of possible predicate input types (i.e., types of semantic manifest " + "elements, such as entities and time dimensions, referenced in filter predicates), but this was enabled " + f"for {self.pushdown_enabled_types}, which includes the following invalid types: {invalid_types}." ) + if self.is_pushdown_disabled: + # TODO: Include where filter specs when they are added to this class + assert self.time_range_constraint is None, ( + "Invalid pushdown parameter configuration! Disabled pushdown parameters cannot have properties " + "set that may lead to improper access and use in other contexts, as that can lead to unintended " + "filtering operations in cases where these properties are accessed without appropriate checks against " + "pushdown configuration. The following properties should all have None values:\n" + f"time_range_constraint: {self.time_range_constraint}" + ) + @property - def is_pushdown_enabled(self) -> bool: - """Convenience accessor for checking that no pushdown constraints exist.""" - pushdown_state = self.pushdown_state - if pushdown_state is PredicatePushdownState.DISABLED: - return False - elif pushdown_state is PredicatePushdownState.FULLY_ENABLED: - return True - elif pushdown_state is PredicatePushdownState.ENABLED_FOR_TIME_RANGE_ONLY: - return True - else: - return assert_values_exhausted(pushdown_state) + def is_pushdown_disabled(self) -> bool: + """Convenience accessor for checking if pushdown should always be skipped.""" + return len(self.pushdown_enabled_types) == 0 + + @property + def is_pushdown_enabled_for_time_range_constraint(self) -> bool: + """Convenience accessor for checking if pushdown is enabled for time range constraints.""" + return PushdownPredicateInputType.TIME_RANGE_CONSTRAINT in self.pushdown_enabled_types @staticmethod def with_time_range_constraint( @@ -149,7 +148,7 @@ def with_time_range_constraint( predicate pushdown in certain sub-branches of the dataflow plan, such as in complex cases involving time spine joins and cumulative metrics. """ - if original_pushdown_params.is_pushdown_enabled: + if original_pushdown_params.is_pushdown_enabled_for_time_range_constraint: return PredicatePushdownParameters( time_range_constraint=time_range_constraint, ) @@ -165,7 +164,10 @@ def with_pushdown_disabled() -> PredicatePushdownParameters: configuration might send a disabled copy of the pushdown parameters down that path while retaining the potential for using another path. """ - return PredicatePushdownParameters(time_range_constraint=None, pushdown_state=PredicatePushdownState.DISABLED) + return PredicatePushdownParameters( + time_range_constraint=None, + pushdown_enabled_types=frozenset(), + ) class PreJoinNodeProcessor: diff --git a/tests_metricflow/dataflow/builder/test_predicate_pushdown.py b/tests_metricflow/dataflow/builder/test_predicate_pushdown.py index 397b61a6e..cf4539381 100644 --- a/tests_metricflow/dataflow/builder/test_predicate_pushdown.py +++ b/tests_metricflow/dataflow/builder/test_predicate_pushdown.py @@ -1,28 +1,16 @@ from __future__ import annotations -import dataclasses - import pytest from metricflow_semantics.filters.time_constraint import TimeRangeConstraint -from metricflow.plan_conversion.node_processor import PredicatePushdownParameters, PredicatePushdownState +from metricflow.plan_conversion.node_processor import PredicatePushdownParameters, PushdownPredicateInputType @pytest.fixture def all_pushdown_params() -> PredicatePushdownParameters: """Tests a valid configuration with all predicate properties set and pushdown fully enabled.""" params = PredicatePushdownParameters( - time_range_constraint=TimeRangeConstraint.all_time(), pushdown_state=PredicatePushdownState.FULLY_ENABLED - ) - predicate_property_names = { - field.name for field in dataclasses.fields(params) if field.metadata.get(params._PREDICATE_METADATA_KEY) - } - predicate_properties = { - name: value for name, value in dataclasses.asdict(params).items() if name in predicate_property_names - } - assert all(value is not None for value in predicate_properties.values()), ( - "All predicate properties in this pushdown param instance should be set to something other than None. Found " - f"one or more None values in property map: {predicate_properties}" + time_range_constraint=TimeRangeConstraint.all_time(), ) return params @@ -31,12 +19,12 @@ def test_time_range_pushdown_enabled_states(all_pushdown_params: PredicatePushdo """Tests pushdown enabled check for time range pushdown operations.""" time_range_only_params = PredicatePushdownParameters( time_range_constraint=TimeRangeConstraint.all_time(), - pushdown_state=PredicatePushdownState.ENABLED_FOR_TIME_RANGE_ONLY, + pushdown_enabled_types=frozenset([PushdownPredicateInputType.TIME_RANGE_CONSTRAINT]), ) enabled_states = { - "fully enabled": all_pushdown_params.is_pushdown_enabled, - "enabled for time range only": time_range_only_params.is_pushdown_enabled, + "fully enabled": all_pushdown_params.is_pushdown_enabled_for_time_range_constraint, + "enabled for time range only": time_range_only_params.is_pushdown_enabled_for_time_range_constraint, } assert all(list(enabled_states.values())), ( @@ -50,5 +38,5 @@ def test_invalid_disabled_pushdown_params() -> None: """Tests checks for invalid param configuration on disabled pushdown parameters.""" with pytest.raises(AssertionError, match="Disabled pushdown parameters cannot have properties set"): PredicatePushdownParameters( - pushdown_state=PredicatePushdownState.DISABLED, time_range_constraint=TimeRangeConstraint.all_time() + time_range_constraint=TimeRangeConstraint.all_time(), pushdown_enabled_types=frozenset() ) From fb9cc1b8bc1d06ebd0b2bb7aa7a9a64c79f4b34e Mon Sep 17 00:00:00 2001 From: tlento Date: Thu, 16 May 2024 18:05:23 -0700 Subject: [PATCH 3/5] Refine time constraint management for predicate pushdown tracking --- .../dataflow/builder/dataflow_plan_builder.py | 23 ++++++++------ metricflow/plan_conversion/node_processor.py | 31 ++++++++++++------- 2 files changed, 34 insertions(+), 20 deletions(-) diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index 2e0ce65a5..d434013ec 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -1323,9 +1323,14 @@ def _build_aggregated_measure_from_measure_source_node( if not before_aggregation_time_spine_join_description else None ) - measure_pushdown_params = PredicatePushdownParameters.with_time_range_constraint( - predicate_pushdown_params, time_range_constraint=measure_time_constraint - ) + if measure_time_constraint is None: + measure_pushdown_params = PredicatePushdownParameters.without_time_range_constraint( + predicate_pushdown_params + ) + else: + measure_pushdown_params = PredicatePushdownParameters.with_time_range_constraint( + predicate_pushdown_params, time_range_constraint=measure_time_constraint + ) find_recipe_start_time = time.time() measure_recipe = self._find_dataflow_recipe( @@ -1448,12 +1453,12 @@ def _build_aggregated_measure_from_measure_source_node( if non_additive_dimension_spec is not None: # Apply semi additive join on the node agg_time_dimension = measure_properties.agg_time_dimension - queried_time_dimension_spec: Optional[TimeDimensionSpec] = ( - self._find_non_additive_dimension_in_linkable_specs( - agg_time_dimension=agg_time_dimension, - linkable_specs=queried_linkable_specs.as_tuple, - non_additive_dimension_spec=non_additive_dimension_spec, - ) + queried_time_dimension_spec: Optional[ + TimeDimensionSpec + ] = self._find_non_additive_dimension_in_linkable_specs( + agg_time_dimension=agg_time_dimension, + linkable_specs=queried_linkable_specs.as_tuple, + non_additive_dimension_spec=non_additive_dimension_spec, ) time_dimension_spec = TimeDimensionSpec.from_name(non_additive_dimension_spec.name) window_groupings = tuple( diff --git a/metricflow/plan_conversion/node_processor.py b/metricflow/plan_conversion/node_processor.py index 1c5ad09fa..b664e1ad2 100644 --- a/metricflow/plan_conversion/node_processor.py +++ b/metricflow/plan_conversion/node_processor.py @@ -140,20 +140,29 @@ def is_pushdown_enabled_for_time_range_constraint(self) -> bool: @staticmethod def with_time_range_constraint( - original_pushdown_params: PredicatePushdownParameters, time_range_constraint: Optional[TimeRangeConstraint] + original_pushdown_params: PredicatePushdownParameters, time_range_constraint: TimeRangeConstraint ) -> PredicatePushdownParameters: - """Factory method for overriding the time range constraint value in a given set of pushdown parameters. + """Factory method for adding or updating a time range constraint input to a set of pushdown parameters. - This allows for crude updates to the core time range constraint, including selectively disabling time range - predicate pushdown in certain sub-branches of the dataflow plan, such as in complex cases involving time spine - joins and cumulative metrics. + This allows for temporarily overriding a time range constraint with an adjusted one, or enabling a time + range constraint filter if one becomes available mid-stream during dataflow plan construction. """ - if original_pushdown_params.is_pushdown_enabled_for_time_range_constraint: - return PredicatePushdownParameters( - time_range_constraint=time_range_constraint, - ) - else: - return original_pushdown_params + pushdown_enabled_types = original_pushdown_params.pushdown_enabled_types.union( + {PushdownPredicateInputType.TIME_RANGE_CONSTRAINT} + ) + return PredicatePushdownParameters( + time_range_constraint=time_range_constraint, pushdown_enabled_types=pushdown_enabled_types + ) + + @staticmethod + def without_time_range_constraint( + original_pushdown_params: PredicatePushdownParameters, + ) -> PredicatePushdownParameters: + """Factory method for removing the time range constraint, if any, from the given set of pushdown parameters.""" + pushdown_enabled_types = original_pushdown_params.pushdown_enabled_types.difference( + {PushdownPredicateInputType.TIME_RANGE_CONSTRAINT} + ) + return PredicatePushdownParameters(time_range_constraint=None, pushdown_enabled_types=pushdown_enabled_types) @staticmethod def with_pushdown_disabled() -> PredicatePushdownParameters: From cdf405afcbc6f7f295582bb073559df0fd437731 Mon Sep 17 00:00:00 2001 From: tlento Date: Thu, 16 May 2024 18:11:08 -0700 Subject: [PATCH 4/5] Rename and fixup documentation for predicate input type enumeration --- .../dataflow/builder/dataflow_plan_builder.py | 4 +- metricflow/plan_conversion/node_processor.py | 41 ++++++++++--------- .../builder/test_predicate_pushdown.py | 4 +- 3 files changed, 26 insertions(+), 23 deletions(-) diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index d434013ec..a19abd6cb 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -83,9 +83,9 @@ from metricflow.dataflow.optimizer.dataflow_plan_optimizer import DataflowPlanOptimizer from metricflow.dataset.dataset_classes import DataSet from metricflow.plan_conversion.node_processor import ( + PredicateInputType, PredicatePushdownParameters, PreJoinNodeProcessor, - PushdownPredicateInputType, ) from metricflow.sql.sql_table import SqlTable @@ -248,7 +248,7 @@ def _build_aggregated_conversion_node( disabled_pushdown_parameters = PredicatePushdownParameters.with_pushdown_disabled() time_range_only_pushdown_parameters = PredicatePushdownParameters( time_range_constraint=predicate_pushdown_params.time_range_constraint, - pushdown_enabled_types=frozenset([PushdownPredicateInputType.TIME_RANGE_CONSTRAINT]), + pushdown_enabled_types=frozenset([PredicateInputType.TIME_RANGE_CONSTRAINT]), ) # Build measure recipes diff --git a/metricflow/plan_conversion/node_processor.py b/metricflow/plan_conversion/node_processor.py index b664e1ad2..6463f7d3c 100644 --- a/metricflow/plan_conversion/node_processor.py +++ b/metricflow/plan_conversion/node_processor.py @@ -61,15 +61,15 @@ class MultiHopJoinCandidate: lineage: MultiHopJoinCandidateLineage -class PushdownPredicateInputType(Enum): - """Enumeration of constraint states describing when predicate pushdown may operate in a dataflow plan. +class PredicateInputType(Enum): + """Enumeration of predicate input types we may encounter in where filters. - This is necessary for holistic checks against the set of potentially enabled pushdown operations, because the - scenario where we only allow time range updates requires careful overriding of other pushdown properties. + This is primarily used for describing when predicate pushdown may operate in a dataflow plan, and is necessary + for holistic checks against the set of potentially enabled pushdown operations. For example, in the scenario + scenario where we only allow time range updates, we must do careful overriding of other pushdown properties. - Note: the time_range_only state is a backwards compatibility shim for use with conversion metrics while - we determine how best to support predicate pushdown for conversion metrics. It may have longer term utility, - but ideally we'd collapse this into a single enabled/disabled boolean property. + This also allows us to disable pushdown for things like time dimension filters in cases where we might + accidental censor input data. """ CATEGORICAL_DIMENSION = "categorical_dimension" @@ -86,9 +86,7 @@ class PredicatePushdownParameters: """ time_range_constraint: Optional[TimeRangeConstraint] - pushdown_enabled_types: FrozenSet[PushdownPredicateInputType] = frozenset( - [PushdownPredicateInputType.TIME_RANGE_CONSTRAINT] - ) + pushdown_enabled_types: FrozenSet[PredicateInputType] = frozenset([PredicateInputType.TIME_RANGE_CONSTRAINT]) def __post_init__(self) -> None: """Validation to ensure pushdown properties are configured correctly. @@ -97,16 +95,16 @@ def __post_init__(self) -> None: outside property access - if pushdown is disabled, no further pushdown operations of any kind are allowed on that particular code branch. It also asserts that unsupported pushdown scenarios are not configured. """ - invalid_types: Set[PushdownPredicateInputType] = set() + invalid_types: Set[PredicateInputType] = set() for input_type in self.pushdown_enabled_types: if ( - input_type is PushdownPredicateInputType.CATEGORICAL_DIMENSION - or input_type is PushdownPredicateInputType.ENTITY - or input_type is PushdownPredicateInputType.TIME_DIMENSION + input_type is PredicateInputType.CATEGORICAL_DIMENSION + or input_type is PredicateInputType.ENTITY + or input_type is PredicateInputType.TIME_DIMENSION ): invalid_types.add(input_type) - elif input_type is PushdownPredicateInputType.TIME_RANGE_CONSTRAINT: + elif input_type is PredicateInputType.TIME_RANGE_CONSTRAINT: continue else: assert_values_exhausted(input_type) @@ -135,8 +133,13 @@ def is_pushdown_disabled(self) -> bool: @property def is_pushdown_enabled_for_time_range_constraint(self) -> bool: - """Convenience accessor for checking if pushdown is enabled for time range constraints.""" - return PushdownPredicateInputType.TIME_RANGE_CONSTRAINT in self.pushdown_enabled_types + """Convenience accessor for checking if pushdown is enabled for time range constraints. + + Note: this time range enabled state is a backwards compatibility shim for use with conversion metrics while + we determine how best to support predicate pushdown for conversion metrics. It may have longer term utility, + but ideally we'd collapse this with the more general time dimension filter input scenarios. + """ + return PredicateInputType.TIME_RANGE_CONSTRAINT in self.pushdown_enabled_types @staticmethod def with_time_range_constraint( @@ -148,7 +151,7 @@ def with_time_range_constraint( range constraint filter if one becomes available mid-stream during dataflow plan construction. """ pushdown_enabled_types = original_pushdown_params.pushdown_enabled_types.union( - {PushdownPredicateInputType.TIME_RANGE_CONSTRAINT} + {PredicateInputType.TIME_RANGE_CONSTRAINT} ) return PredicatePushdownParameters( time_range_constraint=time_range_constraint, pushdown_enabled_types=pushdown_enabled_types @@ -160,7 +163,7 @@ def without_time_range_constraint( ) -> PredicatePushdownParameters: """Factory method for removing the time range constraint, if any, from the given set of pushdown parameters.""" pushdown_enabled_types = original_pushdown_params.pushdown_enabled_types.difference( - {PushdownPredicateInputType.TIME_RANGE_CONSTRAINT} + {PredicateInputType.TIME_RANGE_CONSTRAINT} ) return PredicatePushdownParameters(time_range_constraint=None, pushdown_enabled_types=pushdown_enabled_types) diff --git a/tests_metricflow/dataflow/builder/test_predicate_pushdown.py b/tests_metricflow/dataflow/builder/test_predicate_pushdown.py index cf4539381..7493745f1 100644 --- a/tests_metricflow/dataflow/builder/test_predicate_pushdown.py +++ b/tests_metricflow/dataflow/builder/test_predicate_pushdown.py @@ -3,7 +3,7 @@ import pytest from metricflow_semantics.filters.time_constraint import TimeRangeConstraint -from metricflow.plan_conversion.node_processor import PredicatePushdownParameters, PushdownPredicateInputType +from metricflow.plan_conversion.node_processor import PredicateInputType, PredicatePushdownParameters @pytest.fixture @@ -19,7 +19,7 @@ def test_time_range_pushdown_enabled_states(all_pushdown_params: PredicatePushdo """Tests pushdown enabled check for time range pushdown operations.""" time_range_only_params = PredicatePushdownParameters( time_range_constraint=TimeRangeConstraint.all_time(), - pushdown_enabled_types=frozenset([PushdownPredicateInputType.TIME_RANGE_CONSTRAINT]), + pushdown_enabled_types=frozenset([PredicateInputType.TIME_RANGE_CONSTRAINT]), ) enabled_states = { From 9769994f95375133b108ed0affed25ca55728ee0 Mon Sep 17 00:00:00 2001 From: tlento Date: Thu, 16 May 2024 18:26:55 -0700 Subject: [PATCH 5/5] Fitlers is not a word --- metricflow/dataflow/builder/dataflow_plan_builder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index a19abd6cb..89de8b48d 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -242,7 +242,7 @@ def _build_aggregated_conversion_node( ) -> DataflowPlanNode: """Builds a node that contains aggregated values of conversions and opportunities.""" # Pushdown parameters vary with conversion metrics due to the way the time joins are applied. - # Due to other outstanding issues with conversion metric fitlers, we disable predicate + # Due to other outstanding issues with conversion metric filters, we disable predicate # pushdown for any filter parameter set that is not part of the original time range constraint # implementation. disabled_pushdown_parameters = PredicatePushdownParameters.with_pushdown_disabled()