diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index c2ab75a641..b61fc93836 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -882,16 +882,13 @@ def _find_dataflow_recipe( semantic_model_lookup=self._semantic_model_lookup, node_data_set_resolver=self._node_data_set_resolver, ) - # TODO - Pushdown: Encapsulate this in the node processor - if ( - predicate_pushdown_state.is_pushdown_enabled_for_time_range_constraint - and predicate_pushdown_state.time_range_constraint - ): + + if predicate_pushdown_state.has_pushdown_potential: candidate_nodes_for_left_side_of_join = list( - node_processor.add_time_range_constraint( + node_processor.apply_matching_filter_predicates( source_nodes=candidate_nodes_for_left_side_of_join, + predicate_pushdown_state=predicate_pushdown_state, metric_time_dimension_reference=self._metric_time_dimension_reference, - time_range_constraint=predicate_pushdown_state.time_range_constraint, ) ) diff --git a/metricflow/plan_conversion/node_processor.py b/metricflow/plan_conversion/node_processor.py index da471f4ff0..9373cf0c66 100644 --- a/metricflow/plan_conversion/node_processor.py +++ b/metricflow/plan_conversion/node_processor.py @@ -125,30 +125,36 @@ def __post_init__(self) -> None: f"for {self.pushdown_enabled_types}, which includes the following invalid types: {invalid_types}." ) - if self.is_pushdown_disabled: - # TODO: Include where filter specs when they are added to this class and check state -> predicate pairs - assert self.time_range_constraint is None, ( - "Invalid pushdown state configuration! Disabled pushdown state objects cannot have properties " - "set that may lead to improper access and use in other contexts, as that can lead to unintended " - "filtering operations in cases where these properties are accessed without appropriate checks against " - "pushdown configuration. The following properties should all have None values:\n" - f"time_range_constraint: {self.time_range_constraint}" - ) + # TODO: Include where filter specs when they are added to this class + time_range_constraint_is_valid = ( + self.time_range_constraint is None + or PredicateInputType.TIME_RANGE_CONSTRAINT in self.pushdown_enabled_types + ) + assert time_range_constraint_is_valid, ( + "Invalid pushdown state configuration! Disabled pushdown state objects cannot have properties " + "set that may lead to improper access and use in other contexts, as that can lead to unintended " + "filtering operations in cases where these properties are accessed without appropriate checks against " + "pushdown configuration. The following properties should all have None values:\n" + f"time_range_constraint: {self.time_range_constraint}" + ) @property - def is_pushdown_disabled(self) -> bool: - """Convenience accessor for checking if pushdown should always be skipped.""" - return len(self.pushdown_enabled_types) == 0 + def has_pushdown_potential(self) -> bool: + """Returns whether or not pushdown is enabled for a type with predicate candidates in place.""" + return self.has_time_range_constraint_to_push_down @property - def is_pushdown_enabled_for_time_range_constraint(self) -> bool: - """Convenience accessor for checking if pushdown is enabled for time range constraints. + def has_time_range_constraint_to_push_down(self) -> bool: + """Convenience accessor for checking if there is a time range constraint that can be pushed down. Note: this time range enabled state is a backwards compatibility shim for use with conversion metrics while we determine how best to support predicate pushdown for conversion metrics. It may have longer term utility, but ideally we'd collapse this with the more general time dimension filter input scenarios. """ - return PredicateInputType.TIME_RANGE_CONSTRAINT in self.pushdown_enabled_types + return ( + PredicateInputType.TIME_RANGE_CONSTRAINT in self.pushdown_enabled_types + and self.time_range_constraint is not None + ) @staticmethod def with_time_range_constraint( @@ -220,7 +226,23 @@ def __init__( # noqa: D107 self._semantic_model_lookup = semantic_model_lookup self._join_evaluator = JoinDataflowOutputValidator(semantic_model_lookup) - def add_time_range_constraint( + def apply_matching_filter_predicates( + self, + source_nodes: Sequence[DataflowPlanNode], + predicate_pushdown_state: PredicatePushdownState, + metric_time_dimension_reference: TimeDimensionReference, + ) -> Sequence[DataflowPlanNode]: + """Adds filter predicate nodes to the input nodes as appropriate.""" + if predicate_pushdown_state.has_time_range_constraint_to_push_down: + source_nodes = self._add_time_range_constraint( + source_nodes=source_nodes, + metric_time_dimension_reference=metric_time_dimension_reference, + time_range_constraint=predicate_pushdown_state.time_range_constraint, + ) + + return source_nodes + + def _add_time_range_constraint( self, source_nodes: Sequence[DataflowPlanNode], metric_time_dimension_reference: TimeDimensionReference, diff --git a/tests_metricflow/dataflow/builder/test_predicate_pushdown.py b/tests_metricflow/dataflow/builder/test_predicate_pushdown.py index 8969e49384..ff385fe1b6 100644 --- a/tests_metricflow/dataflow/builder/test_predicate_pushdown.py +++ b/tests_metricflow/dataflow/builder/test_predicate_pushdown.py @@ -23,13 +23,13 @@ def test_time_range_pushdown_enabled_states(fully_enabled_pushdown_state: Predic ) enabled_states = { - "fully enabled": fully_enabled_pushdown_state.is_pushdown_enabled_for_time_range_constraint, - "enabled for time range only": time_range_only_state.is_pushdown_enabled_for_time_range_constraint, + "fully enabled": fully_enabled_pushdown_state.has_time_range_constraint_to_push_down, + "enabled for time range only": time_range_only_state.has_time_range_constraint_to_push_down, } assert all(list(enabled_states.values())), ( "Expected pushdown to be enabled for pushdown state with time range constraint and global pushdown enabled, " - "but some states returned False for is_pushdown_enabled_for_time_range_constraints.\n" + "but some states returned False for has_time_range_constraint_to_push_down.\n" f"Pushdown enabled states: {enabled_states}\n" f"Fully enabled state: {fully_enabled_pushdown_state}\n" f"Time range only state: {time_range_only_state}"