From ae3811f692a440a230fcc819863ff48eef03584a Mon Sep 17 00:00:00 2001 From: Leo Meyerovich Date: Wed, 22 Apr 2026 19:17:14 -0700 Subject: [PATCH] GFQL: add scope-aware predicate pushdown barriers (#1190) --- CHANGELOG.md | 1 + graphistry/compute/gfql/cypher/lowering.py | 29 ++++++++++++- graphistry/compute/gfql/ir/compilation.py | 4 +- graphistry/compute/gfql/ir/pushdown_safety.py | 4 +- .../compute/gfql/passes/predicate_pushdown.py | 39 +++++++++++++---- graphistry/compute/gfql_unified.py | 2 +- .../compute/gfql/cypher/test_lowering.py | 8 ++++ .../gfql/test_predicate_pushdown_pass.py | 42 +++++++++++++++++++ 8 files changed, 115 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d89ae5246..6fe8a905a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - **GFQL / pass framework skeleton**: Added `graphistry/compute/gfql/passes/` with `LogicalPass`, `PassResult`, and deterministic `PassManager.run()` sequencing. The pass manager now invokes IR `verify()` after each pass and fails fast on invalid pass output. Wired a new logical-pass pipeline hook into `gfql()` execution between logical-plan and physical-planner stages using a default no-op pass configuration to preserve runtime behavior. Added focused tests for pass ordering, verifier-failure propagation, and runtime pipeline hook invocation (`test_pass_manager.py`, `test_runtime_physical_cutover.py`) (#1180). - **GFQL / predicate pushdown safety**: Added `graphistry/compute/gfql/ir/pushdown_safety.py` with three reusable utilities for `PredicatePushdownPass`: `is_null_rejecting(pred, null_extended_aliases)` — conservative syntactic heuristic returning True when a predicate references a null-extended alias (OPTIONAL MATCH) and does not use a null-safe form (IS NULL, IS NOT NULL, COALESCE, NULLIF); `is_null_safe` — inverse; `with_barrier_blocks_pushdown(scope_stack, pred_refs)` — returns True when a WITH-clause `ScopeFrame` prevents backward predicate movement for the given reference set. All three exported from `ir/__init__.py`. 41 unit tests (#1181). - **GFQL / predicate pushdown rewrite**: Added `PredicatePushdownPass` implementation in `graphistry/compute/gfql/passes/predicate_pushdown.py` and wired it into logical planning route execution. The pass rewrites `Filter(input=PatternMatch(...))` by pushing safe predicates into `PatternMatch.predicates`, keeps residual filters for partial-push cases, and blocks null-rejecting pushdown into optional arms using existing safety helpers. Added focused pass tests and a lowering-route integration assertion (`test_predicate_pushdown_pass.py`, `cypher/test_lowering.py`) (#1187). +- **GFQL / scope-aware pushdown barriers**: Threaded binder scope metadata (`BoundIR.scope_stack`) into runtime logical-pass context via `CompiledCypherExecutionExtras` and `PlanContext`, and updated `PredicatePushdownPass` to enforce `with_barrier_blocks_pushdown()` using real scope data before moving conjuncts into `PatternMatch.predicates`. Added targeted regressions for blocked vs allowed WITH-boundary movement and compile-route scope metadata threading (`test_predicate_pushdown_pass.py`, `cypher/test_lowering.py`) (#1190). - **GFQL / remote wire migration (M3 follow-up)**: `chain_remote.py` remote Cypher string path no longer imports or dispatches on `CompiledCypher*` classes for wire serialization. It now validates/serializes structural compiled-query shapes (`chain`, `graph_bindings`, `procedure_call`, `use_ref`) so remote wire payload generation is decoupled from compiler IR class identity while preserving existing Let and CALL wire formats. Added parity tests in `test_chain_remote_v2.py` for structural fake compiled-query inputs (including Let bindings/result serialization and structural union rejection) (#1168). - **GFQL / M3 compatibility-deletion gate (PR3 slice)**: Cypher public compatibility surfaces now mark `compile_cypher()` and legacy `CompiledCypher*` exports as deprecated while retaining runtime compatibility. `graphistry.compute.gfql.cypher` now serves deprecated `CompiledCypherProcedureCall`, `CompiledCypherQuery`, `CompiledCypherUnionQuery`, and `compile_cypher_query` via lazy compatibility accessors with explicit deprecation warnings; `compile_cypher()` now emits a deprecation warning and migration guidance (`g.gfql(..., language="cypher")`, `cypher_to_gfql(...)`). API docs now describe `compile_cypher()` as deprecated/internal-shape oriented. Deferred boundaries remain unchanged: remote-wire migration stays tracked in #1168 and hard removal/versioned API cleanup stays tracked in #1169 (#1174). - **GFQL / M3 runtime cutover to PhysicalPlanner**: `gfql()` compiled-query execution in `graphistry/compute/gfql_unified.py` now dispatches through `PhysicalPlanner.plan(...)` for planned logical routes, then executes via physical operator wrappers (`same_path`, `wavefront`, `row_pipeline`). Added bounded compatibility shims for currently-required lanes (`CALL`-backed compiled queries and connected OPTIONAL wavefront payloads), and explicit validation failure when a planned wavefront route lacks executable join payload metadata (no silent unmatched fallback). Added focused runtime cutover tests in `graphistry/tests/compute/gfql/test_runtime_physical_cutover.py` covering planner invocation, optional-wavefront parity, compatibility-shim continuity, and explicit wavefront mismatch failure behavior (#1173). diff --git a/graphistry/compute/gfql/cypher/lowering.py b/graphistry/compute/gfql/cypher/lowering.py index a12b192185..fa3d54e655 100644 --- a/graphistry/compute/gfql/cypher/lowering.py +++ b/graphistry/compute/gfql/cypher/lowering.py @@ -32,7 +32,7 @@ from graphistry.compute.chain import Chain from graphistry.compute.exceptions import ErrorCode, GFQLValidationError from graphistry.compute.gfql.frontends.cypher.binder import FrontendBinder -from graphistry.compute.gfql.ir.bound_ir import BoundIR +from graphistry.compute.gfql.ir.bound_ir import BoundIR, ScopeFrame from graphistry.compute.gfql.ir.compilation import PlanContext from graphistry.compute.gfql.ir.logical_plan import ( Join as LogicalJoin, @@ -146,6 +146,7 @@ class CompiledCypherExecutionExtras: optional_reentry: bool = False scalar_reentry_alias: Optional[str] = None scalar_reentry_columns: Tuple[str, ...] = () + scope_stack: Tuple[ScopeFrame, ...] = () logical_plan: Optional[LogicalPlan] = None logical_plan_defer_reason: Optional[str] = None @@ -204,6 +205,10 @@ def scalar_reentry_alias(self) -> Optional[str]: def scalar_reentry_columns(self) -> Tuple[str, ...]: return () if self.execution_extras is None else self.execution_extras.scalar_reentry_columns + @property + def scope_stack(self) -> Tuple[ScopeFrame, ...]: + return () if self.execution_extras is None else self.execution_extras.scope_stack + @property def logical_plan(self) -> Optional[LogicalPlan]: return None if self.execution_extras is None else self.execution_extras.logical_plan @@ -303,6 +308,7 @@ def _normalize_execution_extras( and execution_extras.optional_reentry is False and execution_extras.scalar_reentry_alias is None and execution_extras.scalar_reentry_columns == () + and execution_extras.scope_stack == () and execution_extras.logical_plan is None and execution_extras.logical_plan_defer_reason is None ): @@ -337,6 +343,7 @@ def _execution_extras_with( optional_reentry: bool = False, scalar_reentry_alias: Optional[str] = None, scalar_reentry_columns: Tuple[str, ...] = (), + scope_stack: Tuple[ScopeFrame, ...] = (), logical_plan: Optional[LogicalPlan] = None, logical_plan_defer_reason: Optional[str] = None, ) -> Optional[CompiledCypherExecutionExtras]: @@ -351,6 +358,7 @@ def _execution_extras_with( optional_reentry=optional_reentry, scalar_reentry_alias=scalar_reentry_alias, scalar_reentry_columns=scalar_reentry_columns, + scope_stack=scope_stack, logical_plan=logical_plan, logical_plan_defer_reason=logical_plan_defer_reason, ) @@ -8364,6 +8372,7 @@ def _attach_logical_plan_route( optional_reentry=result.optional_reentry, scalar_reentry_alias=result.scalar_reentry_alias, scalar_reentry_columns=result.scalar_reentry_columns, + scope_stack=result.scope_stack, logical_plan=effective_logical_plan, logical_plan_defer_reason=effective_defer_reason, ), @@ -8430,6 +8439,7 @@ def compile_cypher_query( compiled_bindings = () logical_plan: Optional[LogicalPlan] = None logical_plan_defer_reason: Optional[str] = None + _bound_scope_stack: Tuple[ScopeFrame, ...] = () def _attach_graph_context(result: CompiledCypherQuery) -> CompiledCypherQuery: out = result @@ -8437,6 +8447,22 @@ def _attach_graph_context(result: CompiledCypherQuery) -> CompiledCypherQuery: out = result else: out = replace(result, graph_bindings=compiled_bindings, use_ref=_use_ref) + out = replace( + out, + execution_extras=_execution_extras_with( + out, + connected_optional_match=out.connected_optional_match, + connected_match_join=out.connected_match_join, + query_graph=out.query_graph, + start_nodes_query=out.start_nodes_query, + optional_reentry=out.optional_reentry, + scalar_reentry_alias=out.scalar_reentry_alias, + scalar_reentry_columns=out.scalar_reentry_columns, + scope_stack=_bound_scope_stack, + logical_plan=out.logical_plan, + logical_plan_defer_reason=out.logical_plan_defer_reason, + ), + ) return _attach_logical_plan_route( out, logical_plan=logical_plan, @@ -8452,6 +8478,7 @@ def _attach_graph_context(result: CompiledCypherQuery) -> CompiledCypherQuery: # Re-bind after normalization so scope and semantic metadata reflect the # lowered query shape consumed by downstream lowering decisions. bound_ir = FrontendBinder().bind(query, PlanContext()) + _bound_scope_stack = tuple(bound_ir.scope_stack) bound_context = _build_bound_lowering_context(bound_ir=bound_ir, params=params) params = bound_context.params _reject_unsupported_where_expr_forms(query) diff --git a/graphistry/compute/gfql/ir/compilation.py b/graphistry/compute/gfql/ir/compilation.py index 6835426c83..6cba3faeed 100644 --- a/graphistry/compute/gfql/ir/compilation.py +++ b/graphistry/compute/gfql/ir/compilation.py @@ -5,7 +5,7 @@ from enum import Enum from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Literal, Optional, Tuple -from graphistry.compute.gfql.ir.bound_ir import BoundIR, SemanticTable +from graphistry.compute.gfql.ir.bound_ir import BoundIR, ScopeFrame, SemanticTable from graphistry.compute.gfql.ir.logical_plan import LogicalPlan from graphistry.compute.gfql.ir.query_graph import QueryGraph @@ -80,6 +80,8 @@ class PlanContext: indexes: List[IndexDescriptor] = field(default_factory=list) backend: BackendCapabilities = field(default_factory=BackendCapabilities) config: CompilerConfig = field(default_factory=CompilerConfig) + # Pass-visible binder scope metadata for optimization safety checks. + scope_stack: Tuple[ScopeFrame, ...] = field(default_factory=tuple) @dataclass(frozen=True) diff --git a/graphistry/compute/gfql/ir/pushdown_safety.py b/graphistry/compute/gfql/ir/pushdown_safety.py index 6ae7845b4d..18baff3129 100644 --- a/graphistry/compute/gfql/ir/pushdown_safety.py +++ b/graphistry/compute/gfql/ir/pushdown_safety.py @@ -14,7 +14,7 @@ """ from __future__ import annotations -from typing import FrozenSet, List +from typing import FrozenSet, Sequence from graphistry.compute.gfql.ir.bound_ir import ScopeFrame from graphistry.compute.gfql.ir.types import BoundPredicate @@ -96,7 +96,7 @@ def is_null_safe( # --------------------------------------------------------------------------- def with_barrier_blocks_pushdown( - scope_stack: List[ScopeFrame], + scope_stack: Sequence[ScopeFrame], predicate_refs: FrozenSet[str], ) -> bool: """Return True if a WITH boundary in *scope_stack* blocks backward pushdown. diff --git a/graphistry/compute/gfql/passes/predicate_pushdown.py b/graphistry/compute/gfql/passes/predicate_pushdown.py index d4f9548eba..be79b03f71 100644 --- a/graphistry/compute/gfql/passes/predicate_pushdown.py +++ b/graphistry/compute/gfql/passes/predicate_pushdown.py @@ -8,10 +8,12 @@ import re from dataclasses import replace -from typing import Any, FrozenSet, List, Tuple, cast +from typing import Any, FrozenSet, List, Sequence, Tuple, cast +from graphistry.compute.gfql.ir.compilation import PlanContext from graphistry.compute.gfql.ir.logical_plan import Filter, LogicalPlan, PatternMatch -from graphistry.compute.gfql.ir.pushdown_safety import is_null_rejecting +from graphistry.compute.gfql.ir.pushdown_safety import is_null_rejecting, with_barrier_blocks_pushdown +from graphistry.compute.gfql.ir.bound_ir import ScopeFrame from graphistry.compute.gfql.ir.types import BoundPredicate from graphistry.compute.gfql.passes.manager import LogicalPass, PassResult @@ -21,9 +23,11 @@ class PredicatePushdownPass(LogicalPass): name = "predicate_pushdown" - def run(self, plan: LogicalPlan, ctx) -> PassResult: # noqa: ANN001 - _ = ctx - rewritten, pushed, residual = _rewrite_tree(plan) + def run(self, plan: LogicalPlan, ctx: PlanContext) -> PassResult: + rewritten, pushed, residual = _rewrite_tree( + plan, + scope_stack=ctx.scope_stack, + ) return PassResult( plan=rewritten, metadata={ @@ -34,14 +38,21 @@ def run(self, plan: LogicalPlan, ctx) -> PassResult: # noqa: ANN001 ) -def _rewrite_tree(plan: LogicalPlan) -> Tuple[LogicalPlan, int, int]: +def _rewrite_tree( + plan: LogicalPlan, + *, + scope_stack: Sequence[ScopeFrame] = (), +) -> Tuple[LogicalPlan, int, int]: pushed = 0 residual = 0 children_updates = {} for slot in ("input", "left", "right", "subquery"): child = getattr(plan, slot, None) if isinstance(child, LogicalPlan): - rewritten_child, child_pushed, child_residual = _rewrite_tree(child) + rewritten_child, child_pushed, child_residual = _rewrite_tree( + child, + scope_stack=scope_stack, + ) pushed += child_pushed residual += child_residual if rewritten_child is not child: @@ -53,13 +64,20 @@ def _rewrite_tree(plan: LogicalPlan) -> Tuple[LogicalPlan, int, int]: else plan ) if isinstance(current, Filter) and isinstance(current.input, PatternMatch): - rewritten_filter, local_pushed, local_residual = _push_filter_into_pattern(current) + rewritten_filter, local_pushed, local_residual = _push_filter_into_pattern( + current, + scope_stack=scope_stack, + ) return rewritten_filter, pushed + local_pushed, residual + local_residual return current, pushed, residual -def _push_filter_into_pattern(filter_op: Filter) -> Tuple[LogicalPlan, int, int]: +def _push_filter_into_pattern( + filter_op: Filter, + *, + scope_stack: Sequence[ScopeFrame] = (), +) -> Tuple[LogicalPlan, int, int]: assert isinstance(filter_op.input, PatternMatch) pattern = cast(PatternMatch, filter_op.input) conjuncts = _split_conjuncts(filter_op.predicate) @@ -76,6 +94,9 @@ def _push_filter_into_pattern(filter_op: Filter) -> Tuple[LogicalPlan, int, int] expression=conjunct.expression, references=_predicate_refs_for_analysis(conjunct, candidate_aliases), ) + if with_barrier_blocks_pushdown(scope_stack, analysis_predicate.references): + kept.append(conjunct) + continue if pattern.optional and is_null_rejecting(analysis_predicate, null_extended_aliases): kept.append(conjunct) continue diff --git a/graphistry/compute/gfql_unified.py b/graphistry/compute/gfql_unified.py index 11dd4c72c6..264fd1dfb6 100644 --- a/graphistry/compute/gfql_unified.py +++ b/graphistry/compute/gfql_unified.py @@ -640,7 +640,7 @@ def _execute_compiled_query_non_union( start_nodes=start_nodes, ) - ctx = PlanContext() + ctx = PlanContext(scope_stack=compiled_query.scope_stack) logical_plan = _run_logical_pass_pipeline(logical_plan, ctx) try: diff --git a/graphistry/tests/compute/gfql/cypher/test_lowering.py b/graphistry/tests/compute/gfql/cypher/test_lowering.py index d8f9511730..181515901c 100644 --- a/graphistry/tests/compute/gfql/cypher/test_lowering.py +++ b/graphistry/tests/compute/gfql/cypher/test_lowering.py @@ -740,6 +740,14 @@ def test_compiled_query_sets_logical_plan_route_for_covered_shape() -> None: assert compiled.logical_plan_defer_reason is None +def test_compiled_query_threads_bound_scope_stack_for_runtime_passes() -> None: + compiled = _compile_query("MATCH (n:Person) RETURN n") + assert compiled.scope_stack + assert compiled.scope_stack[0].origin_clause.upper() == "MATCH" + assert compiled.scope_stack[-1].origin_clause.upper() == "RETURN" + assert "n" in compiled.scope_stack[-1].visible_vars + + def test_compiled_query_sets_logical_plan_route_for_match_scalar_return_shape() -> None: compiled = _compile_query("MATCH (n:Person) RETURN 1 AS x") assert compiled.logical_plan_route == "planned" diff --git a/graphistry/tests/compute/gfql/test_predicate_pushdown_pass.py b/graphistry/tests/compute/gfql/test_predicate_pushdown_pass.py index 7cbb3d08f0..c0effc8f75 100644 --- a/graphistry/tests/compute/gfql/test_predicate_pushdown_pass.py +++ b/graphistry/tests/compute/gfql/test_predicate_pushdown_pass.py @@ -1,5 +1,6 @@ from __future__ import annotations +from graphistry.compute.gfql.ir.bound_ir import ScopeFrame from graphistry.compute.gfql.ir.compilation import PlanContext from graphistry.compute.gfql.ir.logical_plan import Filter, NodeScan, PatternMatch, Project, RowSchema from graphistry.compute.gfql.ir.types import BoundPredicate, NodeRef @@ -151,3 +152,44 @@ def test_predicate_pushdown_does_not_cross_with_like_projection_barrier() -> Non assert isinstance(result.input, Project) assert isinstance(result.input.input, PatternMatch) assert result.input.input.predicates == [] + + +def test_predicate_pushdown_blocks_when_scope_metadata_reports_with_barrier() -> None: + plan = Filter( + op_id=3, + input=PatternMatch(op_id=2, pattern={"aliases": ("n",)}, output_schema=_schema("n")), + predicate=_pred("n.age > 5", frozenset({"n"})), + output_schema=_schema("n"), + ) + ctx = PlanContext( + scope_stack=( + ScopeFrame(visible_vars=frozenset({"n"}), schema=_schema("n"), origin_clause="MATCH"), + ScopeFrame(visible_vars=frozenset({"m"}), schema=_schema("m"), origin_clause="WITH"), + ), + ) + + result = PredicatePushdownPass().run(plan, ctx).plan + + assert isinstance(result, Filter) + assert isinstance(result.input, PatternMatch) + assert result.input.predicates == [] + + +def test_predicate_pushdown_allows_when_scope_metadata_preserves_alias_across_with() -> None: + plan = Filter( + op_id=3, + input=PatternMatch(op_id=2, pattern={"aliases": ("n",)}, output_schema=_schema("n")), + predicate=_pred("n.age > 5", frozenset({"n"})), + output_schema=_schema("n"), + ) + ctx = PlanContext( + scope_stack=( + ScopeFrame(visible_vars=frozenset({"n"}), schema=_schema("n"), origin_clause="MATCH"), + ScopeFrame(visible_vars=frozenset({"n"}), schema=_schema("n"), origin_clause="WITH"), + ), + ) + + result = PredicatePushdownPass().run(plan, ctx).plan + + assert isinstance(result, PatternMatch) + assert [pred.expression for pred in result.predicates] == ["n.age > 5"]