From c9fd6dd41be4c711648ec0037dd3c844788c26ab Mon Sep 17 00:00:00 2001 From: tlento Date: Thu, 16 May 2024 23:06:07 -0700 Subject: [PATCH] Encapsulate time range constraint application in more general method The time range constraint is our currently supported predicate pushdown operation. The specific application of a time range constraint is inlined in the DataflowPlanBuilder. This moves it to a more general function, which will make it easier for us to handle other predicate pushdown states while keeping the conditional logic contained. Note this is effectively a no-op as the new function is currently a pass-through. This is borne out by the lack of changes in our snapshots, several of which cover the time range node. --- .../dataflow/builder/dataflow_plan_builder.py | 11 ++-- metricflow/plan_conversion/node_processor.py | 54 +++++++++++++------ .../builder/test_predicate_pushdown.py | 6 +-- 3 files changed, 45 insertions(+), 26 deletions(-) diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index c2ab75a64..b61fc9383 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -882,16 +882,13 @@ def _find_dataflow_recipe( semantic_model_lookup=self._semantic_model_lookup, node_data_set_resolver=self._node_data_set_resolver, ) - # TODO - Pushdown: Encapsulate this in the node processor - if ( - predicate_pushdown_state.is_pushdown_enabled_for_time_range_constraint - and predicate_pushdown_state.time_range_constraint - ): + + if predicate_pushdown_state.has_pushdown_potential: candidate_nodes_for_left_side_of_join = list( - node_processor.add_time_range_constraint( + node_processor.apply_matching_filter_predicates( source_nodes=candidate_nodes_for_left_side_of_join, + predicate_pushdown_state=predicate_pushdown_state, metric_time_dimension_reference=self._metric_time_dimension_reference, - time_range_constraint=predicate_pushdown_state.time_range_constraint, ) ) diff --git a/metricflow/plan_conversion/node_processor.py b/metricflow/plan_conversion/node_processor.py index da471f4ff..9373cf0c6 100644 --- a/metricflow/plan_conversion/node_processor.py +++ b/metricflow/plan_conversion/node_processor.py @@ -125,30 +125,36 @@ def __post_init__(self) -> None: f"for {self.pushdown_enabled_types}, which includes the following invalid types: {invalid_types}." ) - if self.is_pushdown_disabled: - # TODO: Include where filter specs when they are added to this class and check state -> predicate pairs - assert self.time_range_constraint is None, ( - "Invalid pushdown state configuration! Disabled pushdown state objects cannot have properties " - "set that may lead to improper access and use in other contexts, as that can lead to unintended " - "filtering operations in cases where these properties are accessed without appropriate checks against " - "pushdown configuration. The following properties should all have None values:\n" - f"time_range_constraint: {self.time_range_constraint}" - ) + # TODO: Include where filter specs when they are added to this class + time_range_constraint_is_valid = ( + self.time_range_constraint is None + or PredicateInputType.TIME_RANGE_CONSTRAINT in self.pushdown_enabled_types + ) + assert time_range_constraint_is_valid, ( + "Invalid pushdown state configuration! Disabled pushdown state objects cannot have properties " + "set that may lead to improper access and use in other contexts, as that can lead to unintended " + "filtering operations in cases where these properties are accessed without appropriate checks against " + "pushdown configuration. The following properties should all have None values:\n" + f"time_range_constraint: {self.time_range_constraint}" + ) @property - def is_pushdown_disabled(self) -> bool: - """Convenience accessor for checking if pushdown should always be skipped.""" - return len(self.pushdown_enabled_types) == 0 + def has_pushdown_potential(self) -> bool: + """Returns whether or not pushdown is enabled for a type with predicate candidates in place.""" + return self.has_time_range_constraint_to_push_down @property - def is_pushdown_enabled_for_time_range_constraint(self) -> bool: - """Convenience accessor for checking if pushdown is enabled for time range constraints. + def has_time_range_constraint_to_push_down(self) -> bool: + """Convenience accessor for checking if there is a time range constraint that can be pushed down. Note: this time range enabled state is a backwards compatibility shim for use with conversion metrics while we determine how best to support predicate pushdown for conversion metrics. It may have longer term utility, but ideally we'd collapse this with the more general time dimension filter input scenarios. """ - return PredicateInputType.TIME_RANGE_CONSTRAINT in self.pushdown_enabled_types + return ( + PredicateInputType.TIME_RANGE_CONSTRAINT in self.pushdown_enabled_types + and self.time_range_constraint is not None + ) @staticmethod def with_time_range_constraint( @@ -220,7 +226,23 @@ def __init__( # noqa: D107 self._semantic_model_lookup = semantic_model_lookup self._join_evaluator = JoinDataflowOutputValidator(semantic_model_lookup) - def add_time_range_constraint( + def apply_matching_filter_predicates( + self, + source_nodes: Sequence[DataflowPlanNode], + predicate_pushdown_state: PredicatePushdownState, + metric_time_dimension_reference: TimeDimensionReference, + ) -> Sequence[DataflowPlanNode]: + """Adds filter predicate nodes to the input nodes as appropriate.""" + if predicate_pushdown_state.has_time_range_constraint_to_push_down: + source_nodes = self._add_time_range_constraint( + source_nodes=source_nodes, + metric_time_dimension_reference=metric_time_dimension_reference, + time_range_constraint=predicate_pushdown_state.time_range_constraint, + ) + + return source_nodes + + def _add_time_range_constraint( self, source_nodes: Sequence[DataflowPlanNode], metric_time_dimension_reference: TimeDimensionReference, diff --git a/tests_metricflow/dataflow/builder/test_predicate_pushdown.py b/tests_metricflow/dataflow/builder/test_predicate_pushdown.py index 8969e4938..ff385fe1b 100644 --- a/tests_metricflow/dataflow/builder/test_predicate_pushdown.py +++ b/tests_metricflow/dataflow/builder/test_predicate_pushdown.py @@ -23,13 +23,13 @@ def test_time_range_pushdown_enabled_states(fully_enabled_pushdown_state: Predic ) enabled_states = { - "fully enabled": fully_enabled_pushdown_state.is_pushdown_enabled_for_time_range_constraint, - "enabled for time range only": time_range_only_state.is_pushdown_enabled_for_time_range_constraint, + "fully enabled": fully_enabled_pushdown_state.has_time_range_constraint_to_push_down, + "enabled for time range only": time_range_only_state.has_time_range_constraint_to_push_down, } assert all(list(enabled_states.values())), ( "Expected pushdown to be enabled for pushdown state with time range constraint and global pushdown enabled, " - "but some states returned False for is_pushdown_enabled_for_time_range_constraints.\n" + "but some states returned False for has_time_range_constraint_to_push_down.\n" f"Pushdown enabled states: {enabled_states}\n" f"Fully enabled state: {fully_enabled_pushdown_state}\n" f"Time range only state: {time_range_only_state}"