Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pushdown params to disable time range constraint pushdown #1216

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 37 additions & 12 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@
from metricflow.dataflow.nodes.write_to_table import WriteToResultTableNode
from metricflow.dataflow.optimizer.dataflow_plan_optimizer import DataflowPlanOptimizer
from metricflow.dataset.dataset_classes import DataSet
from metricflow.plan_conversion.node_processor import PredicatePushdownParameters, PreJoinNodeProcessor
from metricflow.plan_conversion.node_processor import (
PredicateInputType,
PredicatePushdownParameters,
PreJoinNodeProcessor,
)
from metricflow.sql.sql_table import SqlTable

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -237,21 +241,30 @@ def _build_aggregated_conversion_node(
constant_properties: Optional[Sequence[ConstantPropertyInput]] = None,
) -> DataflowPlanNode:
"""Builds a node that contains aggregated values of conversions and opportunities."""
# Pushdown parameters vary with conversion metrics due to the way the time joins are applied.
# Due to other outstanding issues with conversion metric filters, we disable predicate
# pushdown for any filter parameter set that is not part of the original time range constraint
# implementation.
disabled_pushdown_parameters = PredicatePushdownParameters.with_pushdown_disabled()
time_range_only_pushdown_parameters = PredicatePushdownParameters(
time_range_constraint=predicate_pushdown_params.time_range_constraint,
pushdown_enabled_types=frozenset([PredicateInputType.TIME_RANGE_CONSTRAINT]),
)

# Build measure recipes
base_required_linkable_specs, _ = self.__get_required_and_extraneous_linkable_specs(
queried_linkable_specs=queried_linkable_specs,
filter_specs=base_measure_spec.filter_specs,
)
base_measure_recipe = self._find_dataflow_recipe(
measure_spec_properties=self._build_measure_spec_properties([base_measure_spec.measure_spec]),
predicate_pushdown_params=predicate_pushdown_params,
predicate_pushdown_params=time_range_only_pushdown_parameters,
linkable_spec_set=base_required_linkable_specs,
)
logger.info(f"Recipe for base measure aggregation:\n{mf_pformat(base_measure_recipe)}")
conversion_measure_recipe = self._find_dataflow_recipe(
measure_spec_properties=self._build_measure_spec_properties([conversion_measure_spec.measure_spec]),
# TODO - Pushdown: Evaluate the potential for applying time constraints and other predicates for conversion
predicate_pushdown_params=PredicatePushdownParameters(time_range_constraint=None),
predicate_pushdown_params=disabled_pushdown_parameters,
linkable_spec_set=LinkableSpecSet(),
)
logger.info(f"Recipe for conversion measure aggregation:\n{mf_pformat(conversion_measure_recipe)}")
Expand All @@ -268,7 +281,7 @@ def _build_aggregated_conversion_node(
aggregated_base_measure_node = self.build_aggregated_measure(
metric_input_measure_spec=base_measure_spec,
queried_linkable_specs=queried_linkable_specs,
predicate_pushdown_params=predicate_pushdown_params,
predicate_pushdown_params=time_range_only_pushdown_parameters,
)

# Build unaggregated conversions source node
Expand Down Expand Up @@ -337,10 +350,14 @@ def _build_aggregated_conversion_node(
required_local_linkable_specs=base_measure_recipe.required_local_linkable_specs,
join_linkable_instances_recipes=base_measure_recipe.join_linkable_instances_recipes,
)
# TODO: Refine conversion metric configuration to fit into the standard dataflow plan building model
# In this case we override the measure recipe, which currently results in us bypassing predicate pushdown
# Rather than relying on happenstance in the way the code is laid out we also explicitly disable
# predicate pushdwon until we are ready to fully support it for conversion metrics
aggregated_conversions_node = self.build_aggregated_measure(
metric_input_measure_spec=conversion_measure_spec,
queried_linkable_specs=queried_linkable_specs,
predicate_pushdown_params=predicate_pushdown_params,
predicate_pushdown_params=disabled_pushdown_parameters,
measure_recipe=recipe_with_join_conversion_source_node,
)

Expand Down Expand Up @@ -492,11 +509,10 @@ def _build_derived_metric_output_node(
if not metric_spec.has_time_offset:
filter_specs.extend(metric_spec.filter_specs)

# TODO - Pushdown: use parameters to disable pushdown operations instead of clobbering the constraints
metric_pushdown_params = (
predicate_pushdown_params
if not metric_spec.has_time_offset
else PredicatePushdownParameters(time_range_constraint=None)
else PredicatePushdownParameters.with_pushdown_disabled()
)

parent_nodes.append(
Expand Down Expand Up @@ -867,7 +883,10 @@ def _find_dataflow_recipe(
node_data_set_resolver=self._node_data_set_resolver,
)
# TODO - Pushdown: Encapsulate this in the node processor
if predicate_pushdown_params.time_range_constraint:
if (
predicate_pushdown_params.is_pushdown_enabled_for_time_range_constraint
and predicate_pushdown_params.time_range_constraint
):
candidate_nodes_for_left_side_of_join = list(
node_processor.add_time_range_constraint(
source_nodes=candidate_nodes_for_left_side_of_join,
Expand Down Expand Up @@ -1298,15 +1317,21 @@ def _build_aggregated_measure_from_measure_source_node(
+ indent(f"\nmeasure_specs:\n{mf_pformat([measure_spec])}")
+ indent(f"\nevaluation:\n{mf_pformat(required_linkable_specs)}")
)

# TODO - Pushdown: Update this to be more robust to additional pushdown parameters
measure_time_constraint = (
(cumulative_metric_adjusted_time_constraint or predicate_pushdown_params.time_range_constraint)
# If joining to time spine for time offset, constraints will be applied after that join.
if not before_aggregation_time_spine_join_description
else None
)
measure_pushdown_params = PredicatePushdownParameters(time_range_constraint=measure_time_constraint)
if measure_time_constraint is None:
measure_pushdown_params = PredicatePushdownParameters.without_time_range_constraint(
predicate_pushdown_params
)
else:
measure_pushdown_params = PredicatePushdownParameters.with_time_range_constraint(
predicate_pushdown_params, time_range_constraint=measure_time_constraint
)

find_recipe_start_time = time.time()
measure_recipe = self._find_dataflow_recipe(
measure_spec_properties=measure_properties,
Expand Down
127 changes: 120 additions & 7 deletions metricflow/plan_conversion/node_processor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from __future__ import annotations

import dataclasses
import logging
from dataclasses import dataclass
from typing import List, Optional, Sequence, Set
from enum import Enum
from typing import FrozenSet, List, Optional, Sequence, Set

from dbt_semantic_interfaces.enum_extension import assert_values_exhausted
from dbt_semantic_interfaces.references import EntityReference, TimeDimensionReference
from metricflow_semantics.filters.time_constraint import TimeRangeConstraint
from metricflow_semantics.mf_logging.pretty_print import mf_pformat
Expand All @@ -28,7 +30,7 @@
logger = logging.getLogger(__name__)


@dataclass(frozen=True)
@dataclasses.dataclass(frozen=True)
class MultiHopJoinCandidateLineage:
"""Describes how the multi-hop join candidate was formed.

Expand All @@ -48,7 +50,7 @@ class MultiHopJoinCandidateLineage:
join_second_node_by_entity: LinklessEntitySpec


@dataclass(frozen=True)
@dataclasses.dataclass(frozen=True)
class MultiHopJoinCandidate:
"""A candidate node containing linkable specs that is join of other nodes. It's used to resolve multi-hop queries.

Expand All @@ -59,14 +61,125 @@ class MultiHopJoinCandidate:
lineage: MultiHopJoinCandidateLineage


@dataclass(frozen=True)
class PredicateInputType(Enum):
"""Enumeration of predicate input types we may encounter in where filters.

This is primarily used for describing when predicate pushdown may operate in a dataflow plan, and is necessary
for holistic checks against the set of potentially enabled pushdown operations. For example, in the scenario
scenario where we only allow time range updates, we must do careful overriding of other pushdown properties.

This also allows us to disable pushdown for things like time dimension filters in cases where we might
accidental censor input data.
"""

CATEGORICAL_DIMENSION = "categorical_dimension"
ENTITY = "entity"
TIME_DIMENSION = "time_dimension"
TIME_RANGE_CONSTRAINT = "time_range_constraint"


@dataclasses.dataclass(frozen=True)
class PredicatePushdownParameters:
"""Container class for managing filter predicate pushdown.
"""Container class for managing information about whether and how to do filter predicate pushdown.

Stores time constraint information for applying pre-join time filters.
The time_range_constraint property holds the time window for setting up a time range filter expression.
"""

time_range_constraint: Optional[TimeRangeConstraint]
pushdown_enabled_types: FrozenSet[PredicateInputType] = frozenset([PredicateInputType.TIME_RANGE_CONSTRAINT])

def __post_init__(self) -> None:
"""Validation to ensure pushdown properties are configured correctly.

In particular, this asserts that cases where pushdown is disabled cannot leak pushdown operations via
outside property access - if pushdown is disabled, no further pushdown operations of any kind are allowed
on that particular code branch. It also asserts that unsupported pushdown scenarios are not configured.
"""
invalid_types: Set[PredicateInputType] = set()

for input_type in self.pushdown_enabled_types:
if (
input_type is PredicateInputType.CATEGORICAL_DIMENSION
or input_type is PredicateInputType.ENTITY
or input_type is PredicateInputType.TIME_DIMENSION
):
invalid_types.add(input_type)
elif input_type is PredicateInputType.TIME_RANGE_CONSTRAINT:
continue
else:
assert_values_exhausted(input_type)

assert len(invalid_types) == 0, (
"Unsupported predicate input type found in pushdown state configuration! We currently only support "
"predicate pushdown for a subset of possible predicate input types (i.e., types of semantic manifest "
"elements, such as entities and time dimensions, referenced in filter predicates), but this was enabled "
f"for {self.pushdown_enabled_types}, which includes the following invalid types: {invalid_types}."
)

if self.is_pushdown_disabled:
# TODO: Include where filter specs when they are added to this class
assert self.time_range_constraint is None, (
"Invalid pushdown parameter configuration! Disabled pushdown parameters cannot have properties "
"set that may lead to improper access and use in other contexts, as that can lead to unintended "
"filtering operations in cases where these properties are accessed without appropriate checks against "
"pushdown configuration. The following properties should all have None values:\n"
f"time_range_constraint: {self.time_range_constraint}"
)

@property
def is_pushdown_disabled(self) -> bool:
"""Convenience accessor for checking if pushdown should always be skipped."""
return len(self.pushdown_enabled_types) == 0

@property
def is_pushdown_enabled_for_time_range_constraint(self) -> bool:
"""Convenience accessor for checking if pushdown is enabled for time range constraints.

Note: this time range enabled state is a backwards compatibility shim for use with conversion metrics while
we determine how best to support predicate pushdown for conversion metrics. It may have longer term utility,
but ideally we'd collapse this with the more general time dimension filter input scenarios.
"""
return PredicateInputType.TIME_RANGE_CONSTRAINT in self.pushdown_enabled_types

@staticmethod
def with_time_range_constraint(
original_pushdown_params: PredicatePushdownParameters, time_range_constraint: TimeRangeConstraint
) -> PredicatePushdownParameters:
"""Factory method for adding or updating a time range constraint input to a set of pushdown parameters.

This allows for temporarily overriding a time range constraint with an adjusted one, or enabling a time
range constraint filter if one becomes available mid-stream during dataflow plan construction.
"""
pushdown_enabled_types = original_pushdown_params.pushdown_enabled_types.union(
{PredicateInputType.TIME_RANGE_CONSTRAINT}
)
return PredicatePushdownParameters(
time_range_constraint=time_range_constraint, pushdown_enabled_types=pushdown_enabled_types
)

@staticmethod
def without_time_range_constraint(
original_pushdown_params: PredicatePushdownParameters,
) -> PredicatePushdownParameters:
"""Factory method for removing the time range constraint, if any, from the given set of pushdown parameters."""
pushdown_enabled_types = original_pushdown_params.pushdown_enabled_types.difference(
{PredicateInputType.TIME_RANGE_CONSTRAINT}
)
return PredicatePushdownParameters(time_range_constraint=None, pushdown_enabled_types=pushdown_enabled_types)

@staticmethod
def with_pushdown_disabled() -> PredicatePushdownParameters:
"""Factory method for disabling predicate pushdown for all parameter types.

This is useful in cases where there is a branched path where pushdown should be disabled in one branch while the
other may remain eligible. For example, a join linkage where one side of the join contains an unsupported
configuration might send a disabled copy of the pushdown parameters down that path while retaining the potential
for using another path.
"""
return PredicatePushdownParameters(
time_range_constraint=None,
pushdown_enabled_types=frozenset(),
)


class PreJoinNodeProcessor:
Expand Down
42 changes: 42 additions & 0 deletions tests_metricflow/dataflow/builder/test_predicate_pushdown.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from __future__ import annotations

import pytest
from metricflow_semantics.filters.time_constraint import TimeRangeConstraint

from metricflow.plan_conversion.node_processor import PredicateInputType, PredicatePushdownParameters


@pytest.fixture
def all_pushdown_params() -> PredicatePushdownParameters:
"""Tests a valid configuration with all predicate properties set and pushdown fully enabled."""
params = PredicatePushdownParameters(
time_range_constraint=TimeRangeConstraint.all_time(),
)
return params


def test_time_range_pushdown_enabled_states(all_pushdown_params: PredicatePushdownParameters) -> None:
"""Tests pushdown enabled check for time range pushdown operations."""
time_range_only_params = PredicatePushdownParameters(
time_range_constraint=TimeRangeConstraint.all_time(),
pushdown_enabled_types=frozenset([PredicateInputType.TIME_RANGE_CONSTRAINT]),
)

enabled_states = {
"fully enabled": all_pushdown_params.is_pushdown_enabled_for_time_range_constraint,
"enabled for time range only": time_range_only_params.is_pushdown_enabled_for_time_range_constraint,
}

assert all(list(enabled_states.values())), (
"Expected pushdown to be enabled for pushdown params with time range constraint and global pushdown enabled, "
f"but some params returned False for is_pushdown_enabled.\nPushdown enabled states: {enabled_states}\n"
f"All params: {all_pushdown_params}\nTime range only params: {time_range_only_params}"
)


def test_invalid_disabled_pushdown_params() -> None:
"""Tests checks for invalid param configuration on disabled pushdown parameters."""
with pytest.raises(AssertionError, match="Disabled pushdown parameters cannot have properties set"):
PredicatePushdownParameters(
time_range_constraint=TimeRangeConstraint.all_time(), pushdown_enabled_types=frozenset()
)
Loading