diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index 1177aa43e..89de8b48d 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -82,7 +82,11 @@ from metricflow.dataflow.nodes.write_to_table import WriteToResultTableNode from metricflow.dataflow.optimizer.dataflow_plan_optimizer import DataflowPlanOptimizer from metricflow.dataset.dataset_classes import DataSet -from metricflow.plan_conversion.node_processor import PredicatePushdownParameters, PreJoinNodeProcessor +from metricflow.plan_conversion.node_processor import ( + PredicateInputType, + PredicatePushdownParameters, + PreJoinNodeProcessor, +) from metricflow.sql.sql_table import SqlTable logger = logging.getLogger(__name__) @@ -237,6 +241,16 @@ def _build_aggregated_conversion_node( constant_properties: Optional[Sequence[ConstantPropertyInput]] = None, ) -> DataflowPlanNode: """Builds a node that contains aggregated values of conversions and opportunities.""" + # Pushdown parameters vary with conversion metrics due to the way the time joins are applied. + # Due to other outstanding issues with conversion metric filters, we disable predicate + # pushdown for any filter parameter set that is not part of the original time range constraint + # implementation. + disabled_pushdown_parameters = PredicatePushdownParameters.with_pushdown_disabled() + time_range_only_pushdown_parameters = PredicatePushdownParameters( + time_range_constraint=predicate_pushdown_params.time_range_constraint, + pushdown_enabled_types=frozenset([PredicateInputType.TIME_RANGE_CONSTRAINT]), + ) + # Build measure recipes base_required_linkable_specs, _ = self.__get_required_and_extraneous_linkable_specs( queried_linkable_specs=queried_linkable_specs, @@ -244,14 +258,13 @@ def _build_aggregated_conversion_node( ) base_measure_recipe = self._find_dataflow_recipe( measure_spec_properties=self._build_measure_spec_properties([base_measure_spec.measure_spec]), - predicate_pushdown_params=predicate_pushdown_params, + predicate_pushdown_params=time_range_only_pushdown_parameters, linkable_spec_set=base_required_linkable_specs, ) logger.info(f"Recipe for base measure aggregation:\n{mf_pformat(base_measure_recipe)}") conversion_measure_recipe = self._find_dataflow_recipe( measure_spec_properties=self._build_measure_spec_properties([conversion_measure_spec.measure_spec]), - # TODO - Pushdown: Evaluate the potential for applying time constraints and other predicates for conversion - predicate_pushdown_params=PredicatePushdownParameters(time_range_constraint=None), + predicate_pushdown_params=disabled_pushdown_parameters, linkable_spec_set=LinkableSpecSet(), ) logger.info(f"Recipe for conversion measure aggregation:\n{mf_pformat(conversion_measure_recipe)}") @@ -268,7 +281,7 @@ def _build_aggregated_conversion_node( aggregated_base_measure_node = self.build_aggregated_measure( metric_input_measure_spec=base_measure_spec, queried_linkable_specs=queried_linkable_specs, - predicate_pushdown_params=predicate_pushdown_params, + predicate_pushdown_params=time_range_only_pushdown_parameters, ) # Build unaggregated conversions source node @@ -337,10 +350,14 @@ def _build_aggregated_conversion_node( required_local_linkable_specs=base_measure_recipe.required_local_linkable_specs, join_linkable_instances_recipes=base_measure_recipe.join_linkable_instances_recipes, ) + # TODO: Refine conversion metric configuration to fit into the standard dataflow plan building model + # In this case we override the measure recipe, which currently results in us bypassing predicate pushdown + # Rather than relying on happenstance in the way the code is laid out we also explicitly disable + # predicate pushdwon until we are ready to fully support it for conversion metrics aggregated_conversions_node = self.build_aggregated_measure( metric_input_measure_spec=conversion_measure_spec, queried_linkable_specs=queried_linkable_specs, - predicate_pushdown_params=predicate_pushdown_params, + predicate_pushdown_params=disabled_pushdown_parameters, measure_recipe=recipe_with_join_conversion_source_node, ) @@ -492,11 +509,10 @@ def _build_derived_metric_output_node( if not metric_spec.has_time_offset: filter_specs.extend(metric_spec.filter_specs) - # TODO - Pushdown: use parameters to disable pushdown operations instead of clobbering the constraints metric_pushdown_params = ( predicate_pushdown_params if not metric_spec.has_time_offset - else PredicatePushdownParameters(time_range_constraint=None) + else PredicatePushdownParameters.with_pushdown_disabled() ) parent_nodes.append( @@ -867,7 +883,10 @@ def _find_dataflow_recipe( node_data_set_resolver=self._node_data_set_resolver, ) # TODO - Pushdown: Encapsulate this in the node processor - if predicate_pushdown_params.time_range_constraint: + if ( + predicate_pushdown_params.is_pushdown_enabled_for_time_range_constraint + and predicate_pushdown_params.time_range_constraint + ): candidate_nodes_for_left_side_of_join = list( node_processor.add_time_range_constraint( source_nodes=candidate_nodes_for_left_side_of_join, @@ -1298,15 +1317,21 @@ def _build_aggregated_measure_from_measure_source_node( + indent(f"\nmeasure_specs:\n{mf_pformat([measure_spec])}") + indent(f"\nevaluation:\n{mf_pformat(required_linkable_specs)}") ) - - # TODO - Pushdown: Update this to be more robust to additional pushdown parameters measure_time_constraint = ( (cumulative_metric_adjusted_time_constraint or predicate_pushdown_params.time_range_constraint) # If joining to time spine for time offset, constraints will be applied after that join. if not before_aggregation_time_spine_join_description else None ) - measure_pushdown_params = PredicatePushdownParameters(time_range_constraint=measure_time_constraint) + if measure_time_constraint is None: + measure_pushdown_params = PredicatePushdownParameters.without_time_range_constraint( + predicate_pushdown_params + ) + else: + measure_pushdown_params = PredicatePushdownParameters.with_time_range_constraint( + predicate_pushdown_params, time_range_constraint=measure_time_constraint + ) + find_recipe_start_time = time.time() measure_recipe = self._find_dataflow_recipe( measure_spec_properties=measure_properties, diff --git a/metricflow/plan_conversion/node_processor.py b/metricflow/plan_conversion/node_processor.py index e21649e11..6463f7d3c 100644 --- a/metricflow/plan_conversion/node_processor.py +++ b/metricflow/plan_conversion/node_processor.py @@ -1,9 +1,11 @@ from __future__ import annotations +import dataclasses import logging -from dataclasses import dataclass -from typing import List, Optional, Sequence, Set +from enum import Enum +from typing import FrozenSet, List, Optional, Sequence, Set +from dbt_semantic_interfaces.enum_extension import assert_values_exhausted from dbt_semantic_interfaces.references import EntityReference, TimeDimensionReference from metricflow_semantics.filters.time_constraint import TimeRangeConstraint from metricflow_semantics.mf_logging.pretty_print import mf_pformat @@ -28,7 +30,7 @@ logger = logging.getLogger(__name__) -@dataclass(frozen=True) +@dataclasses.dataclass(frozen=True) class MultiHopJoinCandidateLineage: """Describes how the multi-hop join candidate was formed. @@ -48,7 +50,7 @@ class MultiHopJoinCandidateLineage: join_second_node_by_entity: LinklessEntitySpec -@dataclass(frozen=True) +@dataclasses.dataclass(frozen=True) class MultiHopJoinCandidate: """A candidate node containing linkable specs that is join of other nodes. It's used to resolve multi-hop queries. @@ -59,14 +61,125 @@ class MultiHopJoinCandidate: lineage: MultiHopJoinCandidateLineage -@dataclass(frozen=True) +class PredicateInputType(Enum): + """Enumeration of predicate input types we may encounter in where filters. + + This is primarily used for describing when predicate pushdown may operate in a dataflow plan, and is necessary + for holistic checks against the set of potentially enabled pushdown operations. For example, in the scenario + scenario where we only allow time range updates, we must do careful overriding of other pushdown properties. + + This also allows us to disable pushdown for things like time dimension filters in cases where we might + accidental censor input data. + """ + + CATEGORICAL_DIMENSION = "categorical_dimension" + ENTITY = "entity" + TIME_DIMENSION = "time_dimension" + TIME_RANGE_CONSTRAINT = "time_range_constraint" + + +@dataclasses.dataclass(frozen=True) class PredicatePushdownParameters: - """Container class for managing filter predicate pushdown. + """Container class for managing information about whether and how to do filter predicate pushdown. - Stores time constraint information for applying pre-join time filters. + The time_range_constraint property holds the time window for setting up a time range filter expression. """ time_range_constraint: Optional[TimeRangeConstraint] + pushdown_enabled_types: FrozenSet[PredicateInputType] = frozenset([PredicateInputType.TIME_RANGE_CONSTRAINT]) + + def __post_init__(self) -> None: + """Validation to ensure pushdown properties are configured correctly. + + In particular, this asserts that cases where pushdown is disabled cannot leak pushdown operations via + outside property access - if pushdown is disabled, no further pushdown operations of any kind are allowed + on that particular code branch. It also asserts that unsupported pushdown scenarios are not configured. + """ + invalid_types: Set[PredicateInputType] = set() + + for input_type in self.pushdown_enabled_types: + if ( + input_type is PredicateInputType.CATEGORICAL_DIMENSION + or input_type is PredicateInputType.ENTITY + or input_type is PredicateInputType.TIME_DIMENSION + ): + invalid_types.add(input_type) + elif input_type is PredicateInputType.TIME_RANGE_CONSTRAINT: + continue + else: + assert_values_exhausted(input_type) + + assert len(invalid_types) == 0, ( + "Unsupported predicate input type found in pushdown state configuration! We currently only support " + "predicate pushdown for a subset of possible predicate input types (i.e., types of semantic manifest " + "elements, such as entities and time dimensions, referenced in filter predicates), but this was enabled " + f"for {self.pushdown_enabled_types}, which includes the following invalid types: {invalid_types}." + ) + + if self.is_pushdown_disabled: + # TODO: Include where filter specs when they are added to this class + assert self.time_range_constraint is None, ( + "Invalid pushdown parameter configuration! Disabled pushdown parameters cannot have properties " + "set that may lead to improper access and use in other contexts, as that can lead to unintended " + "filtering operations in cases where these properties are accessed without appropriate checks against " + "pushdown configuration. The following properties should all have None values:\n" + f"time_range_constraint: {self.time_range_constraint}" + ) + + @property + def is_pushdown_disabled(self) -> bool: + """Convenience accessor for checking if pushdown should always be skipped.""" + return len(self.pushdown_enabled_types) == 0 + + @property + def is_pushdown_enabled_for_time_range_constraint(self) -> bool: + """Convenience accessor for checking if pushdown is enabled for time range constraints. + + Note: this time range enabled state is a backwards compatibility shim for use with conversion metrics while + we determine how best to support predicate pushdown for conversion metrics. It may have longer term utility, + but ideally we'd collapse this with the more general time dimension filter input scenarios. + """ + return PredicateInputType.TIME_RANGE_CONSTRAINT in self.pushdown_enabled_types + + @staticmethod + def with_time_range_constraint( + original_pushdown_params: PredicatePushdownParameters, time_range_constraint: TimeRangeConstraint + ) -> PredicatePushdownParameters: + """Factory method for adding or updating a time range constraint input to a set of pushdown parameters. + + This allows for temporarily overriding a time range constraint with an adjusted one, or enabling a time + range constraint filter if one becomes available mid-stream during dataflow plan construction. + """ + pushdown_enabled_types = original_pushdown_params.pushdown_enabled_types.union( + {PredicateInputType.TIME_RANGE_CONSTRAINT} + ) + return PredicatePushdownParameters( + time_range_constraint=time_range_constraint, pushdown_enabled_types=pushdown_enabled_types + ) + + @staticmethod + def without_time_range_constraint( + original_pushdown_params: PredicatePushdownParameters, + ) -> PredicatePushdownParameters: + """Factory method for removing the time range constraint, if any, from the given set of pushdown parameters.""" + pushdown_enabled_types = original_pushdown_params.pushdown_enabled_types.difference( + {PredicateInputType.TIME_RANGE_CONSTRAINT} + ) + return PredicatePushdownParameters(time_range_constraint=None, pushdown_enabled_types=pushdown_enabled_types) + + @staticmethod + def with_pushdown_disabled() -> PredicatePushdownParameters: + """Factory method for disabling predicate pushdown for all parameter types. + + This is useful in cases where there is a branched path where pushdown should be disabled in one branch while the + other may remain eligible. For example, a join linkage where one side of the join contains an unsupported + configuration might send a disabled copy of the pushdown parameters down that path while retaining the potential + for using another path. + """ + return PredicatePushdownParameters( + time_range_constraint=None, + pushdown_enabled_types=frozenset(), + ) class PreJoinNodeProcessor: diff --git a/tests_metricflow/dataflow/builder/test_predicate_pushdown.py b/tests_metricflow/dataflow/builder/test_predicate_pushdown.py new file mode 100644 index 000000000..7493745f1 --- /dev/null +++ b/tests_metricflow/dataflow/builder/test_predicate_pushdown.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +import pytest +from metricflow_semantics.filters.time_constraint import TimeRangeConstraint + +from metricflow.plan_conversion.node_processor import PredicateInputType, PredicatePushdownParameters + + +@pytest.fixture +def all_pushdown_params() -> PredicatePushdownParameters: + """Tests a valid configuration with all predicate properties set and pushdown fully enabled.""" + params = PredicatePushdownParameters( + time_range_constraint=TimeRangeConstraint.all_time(), + ) + return params + + +def test_time_range_pushdown_enabled_states(all_pushdown_params: PredicatePushdownParameters) -> None: + """Tests pushdown enabled check for time range pushdown operations.""" + time_range_only_params = PredicatePushdownParameters( + time_range_constraint=TimeRangeConstraint.all_time(), + pushdown_enabled_types=frozenset([PredicateInputType.TIME_RANGE_CONSTRAINT]), + ) + + enabled_states = { + "fully enabled": all_pushdown_params.is_pushdown_enabled_for_time_range_constraint, + "enabled for time range only": time_range_only_params.is_pushdown_enabled_for_time_range_constraint, + } + + assert all(list(enabled_states.values())), ( + "Expected pushdown to be enabled for pushdown params with time range constraint and global pushdown enabled, " + f"but some params returned False for is_pushdown_enabled.\nPushdown enabled states: {enabled_states}\n" + f"All params: {all_pushdown_params}\nTime range only params: {time_range_only_params}" + ) + + +def test_invalid_disabled_pushdown_params() -> None: + """Tests checks for invalid param configuration on disabled pushdown parameters.""" + with pytest.raises(AssertionError, match="Disabled pushdown parameters cannot have properties set"): + PredicatePushdownParameters( + time_range_constraint=TimeRangeConstraint.all_time(), pushdown_enabled_types=frozenset() + )