Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- **GFQL / pass framework skeleton**: Added `graphistry/compute/gfql/passes/` with `LogicalPass`, `PassResult`, and deterministic `PassManager.run()` sequencing. The pass manager now invokes IR `verify()` after each pass and fails fast on invalid pass output. Wired a new logical-pass pipeline hook into `gfql()` execution between logical-plan and physical-planner stages using a default no-op pass configuration to preserve runtime behavior. Added focused tests for pass ordering, verifier-failure propagation, and runtime pipeline hook invocation (`test_pass_manager.py`, `test_runtime_physical_cutover.py`) (#1180).
- **GFQL / predicate pushdown safety**: Added `graphistry/compute/gfql/ir/pushdown_safety.py` with three reusable utilities for `PredicatePushdownPass`: `is_null_rejecting(pred, null_extended_aliases)` — conservative syntactic heuristic returning True when a predicate references a null-extended alias (OPTIONAL MATCH) and does not use a null-safe form (IS NULL, IS NOT NULL, COALESCE, NULLIF); `is_null_safe` — inverse; `with_barrier_blocks_pushdown(scope_stack, pred_refs)` — returns True when a WITH-clause `ScopeFrame` prevents backward predicate movement for the given reference set. All three exported from `ir/__init__.py`. 41 unit tests (#1181).
- **GFQL / predicate pushdown rewrite**: Added `PredicatePushdownPass` implementation in `graphistry/compute/gfql/passes/predicate_pushdown.py` and wired it into logical planning route execution. The pass rewrites `Filter(input=PatternMatch(...))` by pushing safe predicates into `PatternMatch.predicates`, keeps residual filters for partial-push cases, and blocks null-rejecting pushdown into optional arms using existing safety helpers. Added focused pass tests and a lowering-route integration assertion (`test_predicate_pushdown_pass.py`, `cypher/test_lowering.py`) (#1187).
- **GFQL / scope-aware pushdown barriers**: Threaded binder scope metadata (`BoundIR.scope_stack`) into runtime logical-pass context via `CompiledCypherExecutionExtras` and `PlanContext`, and updated `PredicatePushdownPass` to enforce `with_barrier_blocks_pushdown()` using real scope data before moving conjuncts into `PatternMatch.predicates`. Added targeted regressions for blocked vs allowed WITH-boundary movement and compile-route scope metadata threading (`test_predicate_pushdown_pass.py`, `cypher/test_lowering.py`) (#1190).
- **GFQL / remote wire migration (M3 follow-up)**: `chain_remote.py` remote Cypher string path no longer imports or dispatches on `CompiledCypher*` classes for wire serialization. It now validates/serializes structural compiled-query shapes (`chain`, `graph_bindings`, `procedure_call`, `use_ref`) so remote wire payload generation is decoupled from compiler IR class identity while preserving existing Let and CALL wire formats. Added parity tests in `test_chain_remote_v2.py` for structural fake compiled-query inputs (including Let bindings/result serialization and structural union rejection) (#1168).
- **GFQL / M3 compatibility-deletion gate (PR3 slice)**: Cypher public compatibility surfaces now mark `compile_cypher()` and legacy `CompiledCypher*` exports as deprecated while retaining runtime compatibility. `graphistry.compute.gfql.cypher` now serves deprecated `CompiledCypherProcedureCall`, `CompiledCypherQuery`, `CompiledCypherUnionQuery`, and `compile_cypher_query` via lazy compatibility accessors with explicit deprecation warnings; `compile_cypher()` now emits a deprecation warning and migration guidance (`g.gfql(..., language="cypher")`, `cypher_to_gfql(...)`). API docs now describe `compile_cypher()` as deprecated/internal-shape oriented. Deferred boundaries remain unchanged: remote-wire migration stays tracked in #1168 and hard removal/versioned API cleanup stays tracked in #1169 (#1174).
- **GFQL / M3 runtime cutover to PhysicalPlanner**: `gfql()` compiled-query execution in `graphistry/compute/gfql_unified.py` now dispatches through `PhysicalPlanner.plan(...)` for planned logical routes, then executes via physical operator wrappers (`same_path`, `wavefront`, `row_pipeline`). Added bounded compatibility shims for currently-required lanes (`CALL`-backed compiled queries and connected OPTIONAL wavefront payloads), and explicit validation failure when a planned wavefront route lacks executable join payload metadata (no silent unmatched fallback). Added focused runtime cutover tests in `graphistry/tests/compute/gfql/test_runtime_physical_cutover.py` covering planner invocation, optional-wavefront parity, compatibility-shim continuity, and explicit wavefront mismatch failure behavior (#1173).
Expand Down
29 changes: 28 additions & 1 deletion graphistry/compute/gfql/cypher/lowering.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from graphistry.compute.chain import Chain
from graphistry.compute.exceptions import ErrorCode, GFQLValidationError
from graphistry.compute.gfql.frontends.cypher.binder import FrontendBinder
from graphistry.compute.gfql.ir.bound_ir import BoundIR
from graphistry.compute.gfql.ir.bound_ir import BoundIR, ScopeFrame
from graphistry.compute.gfql.ir.compilation import PlanContext
from graphistry.compute.gfql.ir.logical_plan import (
Join as LogicalJoin,
Expand Down Expand Up @@ -146,6 +146,7 @@ class CompiledCypherExecutionExtras:
optional_reentry: bool = False
scalar_reentry_alias: Optional[str] = None
scalar_reentry_columns: Tuple[str, ...] = ()
scope_stack: Tuple[ScopeFrame, ...] = ()
logical_plan: Optional[LogicalPlan] = None
logical_plan_defer_reason: Optional[str] = None

Expand Down Expand Up @@ -204,6 +205,10 @@ def scalar_reentry_alias(self) -> Optional[str]:
def scalar_reentry_columns(self) -> Tuple[str, ...]:
return () if self.execution_extras is None else self.execution_extras.scalar_reentry_columns

@property
def scope_stack(self) -> Tuple[ScopeFrame, ...]:
return () if self.execution_extras is None else self.execution_extras.scope_stack

@property
def logical_plan(self) -> Optional[LogicalPlan]:
return None if self.execution_extras is None else self.execution_extras.logical_plan
Expand Down Expand Up @@ -303,6 +308,7 @@ def _normalize_execution_extras(
and execution_extras.optional_reentry is False
and execution_extras.scalar_reentry_alias is None
and execution_extras.scalar_reentry_columns == ()
and execution_extras.scope_stack == ()
and execution_extras.logical_plan is None
and execution_extras.logical_plan_defer_reason is None
):
Expand Down Expand Up @@ -337,6 +343,7 @@ def _execution_extras_with(
optional_reentry: bool = False,
scalar_reentry_alias: Optional[str] = None,
scalar_reentry_columns: Tuple[str, ...] = (),
scope_stack: Tuple[ScopeFrame, ...] = (),
logical_plan: Optional[LogicalPlan] = None,
logical_plan_defer_reason: Optional[str] = None,
) -> Optional[CompiledCypherExecutionExtras]:
Expand All @@ -351,6 +358,7 @@ def _execution_extras_with(
optional_reentry=optional_reentry,
scalar_reentry_alias=scalar_reentry_alias,
scalar_reentry_columns=scalar_reentry_columns,
scope_stack=scope_stack,
logical_plan=logical_plan,
logical_plan_defer_reason=logical_plan_defer_reason,
)
Expand Down Expand Up @@ -8364,6 +8372,7 @@ def _attach_logical_plan_route(
optional_reentry=result.optional_reentry,
scalar_reentry_alias=result.scalar_reentry_alias,
scalar_reentry_columns=result.scalar_reentry_columns,
scope_stack=result.scope_stack,
logical_plan=effective_logical_plan,
logical_plan_defer_reason=effective_defer_reason,
),
Expand Down Expand Up @@ -8430,13 +8439,30 @@ def compile_cypher_query(
compiled_bindings = ()
logical_plan: Optional[LogicalPlan] = None
logical_plan_defer_reason: Optional[str] = None
_bound_scope_stack: Tuple[ScopeFrame, ...] = ()

def _attach_graph_context(result: CompiledCypherQuery) -> CompiledCypherQuery:
out = result
if not compiled_bindings and _use_ref is None:
out = result
else:
out = replace(result, graph_bindings=compiled_bindings, use_ref=_use_ref)
out = replace(
out,
execution_extras=_execution_extras_with(
out,
connected_optional_match=out.connected_optional_match,
connected_match_join=out.connected_match_join,
query_graph=out.query_graph,
start_nodes_query=out.start_nodes_query,
optional_reentry=out.optional_reentry,
scalar_reentry_alias=out.scalar_reentry_alias,
scalar_reentry_columns=out.scalar_reentry_columns,
scope_stack=_bound_scope_stack,
logical_plan=out.logical_plan,
logical_plan_defer_reason=out.logical_plan_defer_reason,
),
)
return _attach_logical_plan_route(
out,
logical_plan=logical_plan,
Expand All @@ -8452,6 +8478,7 @@ def _attach_graph_context(result: CompiledCypherQuery) -> CompiledCypherQuery:
# Re-bind after normalization so scope and semantic metadata reflect the
# lowered query shape consumed by downstream lowering decisions.
bound_ir = FrontendBinder().bind(query, PlanContext())
_bound_scope_stack = tuple(bound_ir.scope_stack)
bound_context = _build_bound_lowering_context(bound_ir=bound_ir, params=params)
params = bound_context.params
_reject_unsupported_where_expr_forms(query)
Expand Down
4 changes: 3 additions & 1 deletion graphistry/compute/gfql/ir/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from enum import Enum
from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Literal, Optional, Tuple

from graphistry.compute.gfql.ir.bound_ir import BoundIR, SemanticTable
from graphistry.compute.gfql.ir.bound_ir import BoundIR, ScopeFrame, SemanticTable
from graphistry.compute.gfql.ir.logical_plan import LogicalPlan
from graphistry.compute.gfql.ir.query_graph import QueryGraph

Expand Down Expand Up @@ -80,6 +80,8 @@ class PlanContext:
indexes: List[IndexDescriptor] = field(default_factory=list)
backend: BackendCapabilities = field(default_factory=BackendCapabilities)
config: CompilerConfig = field(default_factory=CompilerConfig)
# Pass-visible binder scope metadata for optimization safety checks.
scope_stack: Tuple[ScopeFrame, ...] = field(default_factory=tuple)


@dataclass(frozen=True)
Expand Down
4 changes: 2 additions & 2 deletions graphistry/compute/gfql/ir/pushdown_safety.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"""
from __future__ import annotations

from typing import FrozenSet, List
from typing import FrozenSet, Sequence

from graphistry.compute.gfql.ir.bound_ir import ScopeFrame
from graphistry.compute.gfql.ir.types import BoundPredicate
Expand Down Expand Up @@ -96,7 +96,7 @@ def is_null_safe(
# ---------------------------------------------------------------------------

def with_barrier_blocks_pushdown(
scope_stack: List[ScopeFrame],
scope_stack: Sequence[ScopeFrame],
predicate_refs: FrozenSet[str],
) -> bool:
"""Return True if a WITH boundary in *scope_stack* blocks backward pushdown.
Expand Down
39 changes: 30 additions & 9 deletions graphistry/compute/gfql/passes/predicate_pushdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@

import re
from dataclasses import replace
from typing import Any, FrozenSet, List, Tuple, cast
from typing import Any, FrozenSet, List, Sequence, Tuple, cast

from graphistry.compute.gfql.ir.compilation import PlanContext
from graphistry.compute.gfql.ir.logical_plan import Filter, LogicalPlan, PatternMatch
from graphistry.compute.gfql.ir.pushdown_safety import is_null_rejecting
from graphistry.compute.gfql.ir.pushdown_safety import is_null_rejecting, with_barrier_blocks_pushdown
from graphistry.compute.gfql.ir.bound_ir import ScopeFrame
from graphistry.compute.gfql.ir.types import BoundPredicate
from graphistry.compute.gfql.passes.manager import LogicalPass, PassResult

Expand All @@ -21,9 +23,11 @@ class PredicatePushdownPass(LogicalPass):

name = "predicate_pushdown"

def run(self, plan: LogicalPlan, ctx) -> PassResult: # noqa: ANN001
_ = ctx
rewritten, pushed, residual = _rewrite_tree(plan)
def run(self, plan: LogicalPlan, ctx: PlanContext) -> PassResult:
rewritten, pushed, residual = _rewrite_tree(
plan,
scope_stack=ctx.scope_stack,
)
return PassResult(
plan=rewritten,
metadata={
Expand All @@ -34,14 +38,21 @@ def run(self, plan: LogicalPlan, ctx) -> PassResult: # noqa: ANN001
)


def _rewrite_tree(plan: LogicalPlan) -> Tuple[LogicalPlan, int, int]:
def _rewrite_tree(
plan: LogicalPlan,
*,
scope_stack: Sequence[ScopeFrame] = (),
) -> Tuple[LogicalPlan, int, int]:
pushed = 0
residual = 0
children_updates = {}
for slot in ("input", "left", "right", "subquery"):
child = getattr(plan, slot, None)
if isinstance(child, LogicalPlan):
rewritten_child, child_pushed, child_residual = _rewrite_tree(child)
rewritten_child, child_pushed, child_residual = _rewrite_tree(
child,
scope_stack=scope_stack,
)
pushed += child_pushed
residual += child_residual
if rewritten_child is not child:
Expand All @@ -53,13 +64,20 @@ def _rewrite_tree(plan: LogicalPlan) -> Tuple[LogicalPlan, int, int]:
else plan
)
if isinstance(current, Filter) and isinstance(current.input, PatternMatch):
rewritten_filter, local_pushed, local_residual = _push_filter_into_pattern(current)
rewritten_filter, local_pushed, local_residual = _push_filter_into_pattern(
current,
scope_stack=scope_stack,
)
return rewritten_filter, pushed + local_pushed, residual + local_residual

return current, pushed, residual


def _push_filter_into_pattern(filter_op: Filter) -> Tuple[LogicalPlan, int, int]:
def _push_filter_into_pattern(
filter_op: Filter,
*,
scope_stack: Sequence[ScopeFrame] = (),
) -> Tuple[LogicalPlan, int, int]:
assert isinstance(filter_op.input, PatternMatch)
pattern = cast(PatternMatch, filter_op.input)
conjuncts = _split_conjuncts(filter_op.predicate)
Expand All @@ -76,6 +94,9 @@ def _push_filter_into_pattern(filter_op: Filter) -> Tuple[LogicalPlan, int, int]
expression=conjunct.expression,
references=_predicate_refs_for_analysis(conjunct, candidate_aliases),
)
if with_barrier_blocks_pushdown(scope_stack, analysis_predicate.references):
kept.append(conjunct)
continue
if pattern.optional and is_null_rejecting(analysis_predicate, null_extended_aliases):
kept.append(conjunct)
continue
Expand Down
2 changes: 1 addition & 1 deletion graphistry/compute/gfql_unified.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ def _execute_compiled_query_non_union(
start_nodes=start_nodes,
)

ctx = PlanContext()
ctx = PlanContext(scope_stack=compiled_query.scope_stack)
logical_plan = _run_logical_pass_pipeline(logical_plan, ctx)

try:
Expand Down
8 changes: 8 additions & 0 deletions graphistry/tests/compute/gfql/cypher/test_lowering.py
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,14 @@ def test_compiled_query_sets_logical_plan_route_for_covered_shape() -> None:
assert compiled.logical_plan_defer_reason is None


def test_compiled_query_threads_bound_scope_stack_for_runtime_passes() -> None:
compiled = _compile_query("MATCH (n:Person) RETURN n")
assert compiled.scope_stack
assert compiled.scope_stack[0].origin_clause.upper() == "MATCH"
assert compiled.scope_stack[-1].origin_clause.upper() == "RETURN"
assert "n" in compiled.scope_stack[-1].visible_vars


def test_compiled_query_sets_logical_plan_route_for_match_scalar_return_shape() -> None:
compiled = _compile_query("MATCH (n:Person) RETURN 1 AS x")
assert compiled.logical_plan_route == "planned"
Expand Down
42 changes: 42 additions & 0 deletions graphistry/tests/compute/gfql/test_predicate_pushdown_pass.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from graphistry.compute.gfql.ir.bound_ir import ScopeFrame
from graphistry.compute.gfql.ir.compilation import PlanContext
from graphistry.compute.gfql.ir.logical_plan import Filter, NodeScan, PatternMatch, Project, RowSchema
from graphistry.compute.gfql.ir.types import BoundPredicate, NodeRef
Expand Down Expand Up @@ -151,3 +152,44 @@ def test_predicate_pushdown_does_not_cross_with_like_projection_barrier() -> Non
assert isinstance(result.input, Project)
assert isinstance(result.input.input, PatternMatch)
assert result.input.input.predicates == []


def test_predicate_pushdown_blocks_when_scope_metadata_reports_with_barrier() -> None:
plan = Filter(
op_id=3,
input=PatternMatch(op_id=2, pattern={"aliases": ("n",)}, output_schema=_schema("n")),
predicate=_pred("n.age > 5", frozenset({"n"})),
output_schema=_schema("n"),
)
ctx = PlanContext(
scope_stack=(
ScopeFrame(visible_vars=frozenset({"n"}), schema=_schema("n"), origin_clause="MATCH"),
ScopeFrame(visible_vars=frozenset({"m"}), schema=_schema("m"), origin_clause="WITH"),
),
)

result = PredicatePushdownPass().run(plan, ctx).plan

assert isinstance(result, Filter)
assert isinstance(result.input, PatternMatch)
assert result.input.predicates == []


def test_predicate_pushdown_allows_when_scope_metadata_preserves_alias_across_with() -> None:
plan = Filter(
op_id=3,
input=PatternMatch(op_id=2, pattern={"aliases": ("n",)}, output_schema=_schema("n")),
predicate=_pred("n.age > 5", frozenset({"n"})),
output_schema=_schema("n"),
)
ctx = PlanContext(
scope_stack=(
ScopeFrame(visible_vars=frozenset({"n"}), schema=_schema("n"), origin_clause="MATCH"),
ScopeFrame(visible_vars=frozenset({"n"}), schema=_schema("n"), origin_clause="WITH"),
),
)

result = PredicatePushdownPass().run(plan, ctx).plan

assert isinstance(result, PatternMatch)
assert [pred.expression for pred in result.predicates] == ["n.age > 5"]
Loading