Skip to content

Commit

Permalink
Use pushdown params to disable time range constraint pushdown
Browse files Browse the repository at this point in the history
The predicate pushdown operations for time range constriants currently
rely on None/not-None state to determine whether or not to push the
ConstrainTimeRange filter node to the source. With the switch to
pushdown params this None/not-None state gets tenuous, as future
updates will need to do things like manage time range constraints,
other time dimension filters, and categorical dimension (and entity)
filters, and relying on externalizing None/not-None combinations for
these various filter types will be challenging.

This change encapsulates the enabling and disabling of time range
constraints inside of PredicatePushdownParams. At the moment, in
order to maintain the existing behavior, we simply internalize the
None/not-None behavior for time range constraints. This will also
allow us to easily retain pushdown processing for categorical dimensions
even when time constraint filters should not be applied, and
gradually centralize those controls as we streamline the callsites.

There is added complexity to this change because of two things.

1. Time constraint updating is scattered around in the DataflowPlanBuilder
2. Conversion metrics currently do predicate pushdown for time constraints

The first of these will be addressed later.
Since we cannot reliably support general predicate pushdown for conversion
metrics, we need to allow for a time-range-only pushdown operation. Today
this is equivalent to pushdown enabled, but that will change shortly.
  • Loading branch information
tlento committed May 15, 2024
1 parent bfb75ba commit 9435008
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 19 deletions.
41 changes: 29 additions & 12 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,11 @@
from metricflow.dataflow.nodes.write_to_table import WriteToResultTableNode
from metricflow.dataflow.optimizer.dataflow_plan_optimizer import DataflowPlanOptimizer
from metricflow.dataset.dataset_classes import DataSet
from metricflow.plan_conversion.node_processor import PredicatePushdownParameters, PreJoinNodeProcessor
from metricflow.plan_conversion.node_processor import (
PredicatePushdownParameters,
PredicatePushdownState,
PreJoinNodeProcessor,
)
from metricflow.sql.sql_table import SqlTable

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -238,21 +242,30 @@ def _build_aggregated_conversion_node(
constant_properties: Optional[Sequence[ConstantPropertyInput]] = None,
) -> BaseOutput:
"""Builds a node that contains aggregated values of conversions and opportunities."""
# Pushdown parameters vary with conversion metrics due to the way the time joins are applied.
# Due to other outstanding issues with conversion metric fitlers, we disable predicate
# pushdown for any filter parameter set that is not part of the original time range constraint
# implementation.
disabled_pushdown_parameters = PredicatePushdownParameters.with_pushdown_disabled()
time_range_only_pushdown_parameters = PredicatePushdownParameters(
time_range_constraint=predicate_pushdown_params.time_range_constraint,
pushdown_state=PredicatePushdownState.ENABLED_FOR_TIME_RANGE_ONLY,
)

# Build measure recipes
base_required_linkable_specs, _ = self.__get_required_and_extraneous_linkable_specs(
queried_linkable_specs=queried_linkable_specs,
filter_specs=base_measure_spec.filter_specs,
)
base_measure_recipe = self._find_dataflow_recipe(
measure_spec_properties=self._build_measure_spec_properties([base_measure_spec.measure_spec]),
predicate_pushdown_params=predicate_pushdown_params,
predicate_pushdown_params=time_range_only_pushdown_parameters,
linkable_spec_set=base_required_linkable_specs,
)
logger.info(f"Recipe for base measure aggregation:\n{mf_pformat(base_measure_recipe)}")
conversion_measure_recipe = self._find_dataflow_recipe(
measure_spec_properties=self._build_measure_spec_properties([conversion_measure_spec.measure_spec]),
# TODO - Pushdown: Evaluate the potential for applying time constraints and other predicates for conversion
predicate_pushdown_params=PredicatePushdownParameters(time_range_constraint=None),
predicate_pushdown_params=disabled_pushdown_parameters,
linkable_spec_set=LinkableSpecSet(),
)
logger.info(f"Recipe for conversion measure aggregation:\n{mf_pformat(conversion_measure_recipe)}")
Expand All @@ -269,7 +282,7 @@ def _build_aggregated_conversion_node(
aggregated_base_measure_node = self.build_aggregated_measure(
metric_input_measure_spec=base_measure_spec,
queried_linkable_specs=queried_linkable_specs,
predicate_pushdown_params=predicate_pushdown_params,
predicate_pushdown_params=time_range_only_pushdown_parameters,
)

# Build unaggregated conversions source node
Expand Down Expand Up @@ -338,10 +351,14 @@ def _build_aggregated_conversion_node(
required_local_linkable_specs=base_measure_recipe.required_local_linkable_specs,
join_linkable_instances_recipes=base_measure_recipe.join_linkable_instances_recipes,
)
# TODO: Refine conversion metric configuration to fit into the standard dataflow plan building model
# In this case we override the measure recipe, which currently results in us bypassing predicate pushdown
# Rather than relying on happenstance in the way the code is laid out we also explicitly disable
# predicate pushdwon until we are ready to fully support it for conversion metrics
aggregated_conversions_node = self.build_aggregated_measure(
metric_input_measure_spec=conversion_measure_spec,
queried_linkable_specs=queried_linkable_specs,
predicate_pushdown_params=predicate_pushdown_params,
predicate_pushdown_params=disabled_pushdown_parameters,
measure_recipe=recipe_with_join_conversion_source_node,
)

Expand Down Expand Up @@ -493,11 +510,10 @@ def _build_derived_metric_output_node(
if not metric_spec.has_time_offset:
filter_specs.extend(metric_spec.filter_specs)

# TODO - Pushdown: use parameters to disable pushdown operations instead of clobbering the constraints
metric_pushdown_params = (
predicate_pushdown_params
if not metric_spec.has_time_offset
else PredicatePushdownParameters(time_range_constraint=None)
else PredicatePushdownParameters.with_pushdown_disabled()
)

parent_nodes.append(
Expand Down Expand Up @@ -868,7 +884,7 @@ def _find_dataflow_recipe(
node_data_set_resolver=self._node_data_set_resolver,
)
# TODO - Pushdown: Encapsulate this in the node processor
if predicate_pushdown_params.time_range_constraint:
if predicate_pushdown_params.is_pushdown_enabled and predicate_pushdown_params.time_range_constraint:
candidate_nodes_for_left_side_of_join = list(
node_processor.add_time_range_constraint(
source_nodes=candidate_nodes_for_left_side_of_join,
Expand Down Expand Up @@ -1299,15 +1315,16 @@ def _build_aggregated_measure_from_measure_source_node(
+ indent(f"\nmeasure_specs:\n{mf_pformat([measure_spec])}")
+ indent(f"\nevaluation:\n{mf_pformat(required_linkable_specs)}")
)

# TODO - Pushdown: Update this to be more robust to additional pushdown parameters
measure_time_constraint = (
(cumulative_metric_adjusted_time_constraint or predicate_pushdown_params.time_range_constraint)
# If joining to time spine for time offset, constraints will be applied after that join.
if not before_aggregation_time_spine_join_description
else None
)
measure_pushdown_params = PredicatePushdownParameters(time_range_constraint=measure_time_constraint)
measure_pushdown_params = PredicatePushdownParameters.with_time_range_constraint(
predicate_pushdown_params, time_range_constraint=measure_time_constraint
)

find_recipe_start_time = time.time()
measure_recipe = self._find_dataflow_recipe(
measure_spec_properties=measure_properties,
Expand Down
113 changes: 106 additions & 7 deletions metricflow/plan_conversion/node_processor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from __future__ import annotations

import dataclasses
import logging
from dataclasses import dataclass
from enum import Enum
from typing import List, Optional, Sequence, Set

from dbt_semantic_interfaces.enum_extension import assert_values_exhausted
from dbt_semantic_interfaces.references import EntityReference, TimeDimensionReference
from metricflow_semantics.filters.time_constraint import TimeRangeConstraint
from metricflow_semantics.mf_logging.pretty_print import mf_pformat
Expand All @@ -28,7 +30,7 @@
logger = logging.getLogger(__name__)


@dataclass(frozen=True)
@dataclasses.dataclass(frozen=True)
class MultiHopJoinCandidateLineage:
"""Describes how the multi-hop join candidate was formed.
Expand All @@ -48,7 +50,7 @@ class MultiHopJoinCandidateLineage:
join_second_node_by_entity: LinklessEntitySpec


@dataclass(frozen=True)
@dataclasses.dataclass(frozen=True)
class MultiHopJoinCandidate:
"""A candidate node containing linkable specs that is join of other nodes. It's used to resolve multi-hop queries.
Expand All @@ -59,14 +61,111 @@ class MultiHopJoinCandidate:
lineage: MultiHopJoinCandidateLineage


@dataclass(frozen=True)
class PredicatePushdownState(Enum):
"""Enumeration of constraint states describing when predicate pushdown may operate in a dataflow plan.
This is necessary for holistic checks against the set of potentially enabled pushdown operations, because the
scenario where we only allow time range updates requires careful overriding of other pushdown properties.
Note: the time_range_only state is a backwards compatibility shim for use with conversion metrics while
we determine how best to support predicate pushdown for conversion metrics. It may have longer term utility,
but ideally we'd collapse this into a single enabled/disabled boolean property.
"""

DISABLED = "disabled"
FULLY_ENABLED = "fully_enabled"
ENABLED_FOR_TIME_RANGE_ONLY = "time_range_only"


@dataclasses.dataclass(frozen=True)
class PredicatePushdownParameters:
"""Container class for managing filter predicate pushdown.
"""Container class for managing information about whether and how to do filter predicate pushdown.
Stores time constraint information for applying pre-join time filters.
The time_range_constraint property holds the time window for setting up a time range filter expression.
"""

time_range_constraint: Optional[TimeRangeConstraint]
_PREDICATE_METADATA_KEY = "is_predicate_property"

time_range_constraint: Optional[TimeRangeConstraint] = dataclasses.field(metadata={_PREDICATE_METADATA_KEY: True})
pushdown_state: PredicatePushdownState = PredicatePushdownState.FULLY_ENABLED

def __post_init__(self) -> None:
"""Validation to ensure pushdown properties are configured correctly.
In particular, this asserts that cases where pushdown is disabled cannot leak pushdown operations via
outside property access - if pushdown is disabled, no further pushdown operations of any kind are allowed
on that particular code branch.
"""
if self.pushdown_state is PredicatePushdownState.FULLY_ENABLED:
return

invalid_predicate_property_names = {
field.name for field in dataclasses.fields(self) if field.metadata.get(self._PREDICATE_METADATA_KEY)
}

if self.pushdown_state is PredicatePushdownState.ENABLED_FOR_TIME_RANGE_ONLY:
# We don't do validation for time range constraint configuration - having None set in this state is the
# equivalent of disabling predicate pushdown, but that time constraint value might be updated later,
# and so we do not block overrides to (or from) None to avoid meaningless bookkeeping at callsites.
# Also, we keep the magic string name Python uses hidden in here instead of expanding access to it.
invalid_predicate_property_names.remove("time_range_constraint")

instance_configuration = dataclasses.asdict(self)
invalid_disabled_properties = {
property_name: value
for property_name, value in instance_configuration.items()
if property_name in invalid_predicate_property_names and value is not None
}

assert not invalid_disabled_properties, (
"Invalid pushdown parameter configuration! Disabled pushdown parameters cannot have properties "
"set that may lead to improper access and use in other contexts, as that can lead to unintended "
"filtering operations in cases where these properties are accessed without appropriate checks against "
"pushdown configuration. This indicates that pushdown is configured for limited scope operations, but "
f"other predicate properties are set to non-None values.\nFull configuration: {instance_configuration}\n"
f"Invalid predicate properties: {invalid_disabled_properties}"
)

@property
def is_pushdown_enabled(self) -> bool:
"""Convenience accessor for checking that no pushdown constraints exist."""
pushdown_state = self.pushdown_state
if pushdown_state is PredicatePushdownState.DISABLED:
return False
elif pushdown_state is PredicatePushdownState.FULLY_ENABLED:
return True
elif pushdown_state is PredicatePushdownState.ENABLED_FOR_TIME_RANGE_ONLY:
return True
else:
return assert_values_exhausted(pushdown_state)

@staticmethod
def with_time_range_constraint(
original_pushdown_params: PredicatePushdownParameters, time_range_constraint: Optional[TimeRangeConstraint]
) -> PredicatePushdownParameters:
"""Factory method for overriding the time range constraint value in a given set of pushdown parameters.
This allows for crude updates to the core time range constraint, including selectively disabling time range
predicate pushdown in certain sub-branches of the dataflow plan, such as in complex cases involving time spine
joins and cumulative metrics.
"""
if original_pushdown_params.is_pushdown_enabled:
return PredicatePushdownParameters(
time_range_constraint=time_range_constraint,
)
else:
return original_pushdown_params

@staticmethod
def with_pushdown_disabled() -> PredicatePushdownParameters:
"""Factory method for disabling predicate pushdown for all parameter types.
This is useful in cases where there is a branched path where pushdown should be disabled in one branch while the
other may remain eligible. For example, a join linkage where one side of the join contains an unsupported
configuration might send a disabled copy of the pushdown parameters down that path while retaining the potential
for using another path.
"""
return PredicatePushdownParameters(time_range_constraint=None, pushdown_state=PredicatePushdownState.DISABLED)


class PreJoinNodeProcessor:
Expand Down
54 changes: 54 additions & 0 deletions tests_metricflow/dataflow/builder/test_predicate_pushdown.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from __future__ import annotations

import dataclasses

import pytest
from metricflow_semantics.filters.time_constraint import TimeRangeConstraint

from metricflow.plan_conversion.node_processor import PredicatePushdownParameters, PredicatePushdownState


@pytest.fixture
def all_pushdown_params() -> PredicatePushdownParameters:
"""Tests a valid configuration with all predicate properties set and pushdown fully enabled."""
params = PredicatePushdownParameters(
time_range_constraint=TimeRangeConstraint.all_time(), pushdown_state=PredicatePushdownState.FULLY_ENABLED
)
predicate_property_names = {
field.name for field in dataclasses.fields(params) if field.metadata.get(params._PREDICATE_METADATA_KEY)
}
predicate_properties = {
name: value for name, value in dataclasses.asdict(params).items() if name in predicate_property_names
}
assert all(value is not None for value in predicate_properties.values()), (
"All predicate properties in this pushdown param instance should be set to something other than None. Found "
f"one or more None values in property map: {predicate_properties}"
)
return params


def test_time_range_pushdown_enabled_states(all_pushdown_params: PredicatePushdownParameters) -> None:
"""Tests pushdown enabled check for time range pushdown operations."""
time_range_only_params = PredicatePushdownParameters(
time_range_constraint=TimeRangeConstraint.all_time(),
pushdown_state=PredicatePushdownState.ENABLED_FOR_TIME_RANGE_ONLY,
)

enabled_states = {
"fully enabled": all_pushdown_params.is_pushdown_enabled,
"enabled for time range only": time_range_only_params.is_pushdown_enabled,
}

assert all(list(enabled_states.values())), (
"Expected pushdown to be enabled for pushdown params with time range constraint and global pushdown enabled, "
f"but some params returned False for is_pushdown_enabled.\nPushdown enabled states: {enabled_states}\n"
f"All params: {all_pushdown_params}\nTime range only params: {time_range_only_params}"
)


def test_invalid_disabled_pushdown_params() -> None:
"""Tests checks for invalid param configuration on disabled pushdown parameters."""
with pytest.raises(AssertionError, match="Disabled pushdown parameters cannot have properties set"):
PredicatePushdownParameters(
pushdown_state=PredicatePushdownState.DISABLED, time_range_constraint=TimeRangeConstraint.all_time()
)

0 comments on commit 9435008

Please sign in to comment.