Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,34 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

### Added
- **Telemetry**: Added optional OpenTelemetry helper and propagated trace headers through plot/upload/remote GFQL paths.
- **GFQL / WHERE** (experimental): Added `Chain.where` field for same-path WHERE clause constraints. New modules: `same_path_types.py`, `df_executor.py`, and `same_path/` submodules implementing Yannakakis-style semijoin reduction for efficient WHERE filtering. Supports equality, inequality, and comparison operators on named alias columns.
- **GFQL / WHERE**: `gfql([...], where=[...])` list form now supports same-path WHERE constraints (no need to wrap in `Chain(...)`).
- **GFQL / cuDF same-path**: Added execution-mode gate `GRAPHISTRY_CUDF_SAME_PATH_MODE` (auto/oracle/strict) for GFQL cuDF same-path executor. Auto falls back to oracle when GPU unavailable; strict requires cuDF or raises.
- **GFQL / WHERE**: Added opt-in `GRAPHISTRY_NON_ADJ_WHERE_MULTI_EQ_SEMIJOIN` for multi-equality semijoin pruning (2-hop, experimental).
- **GFQL / WHERE**: Added opt-in `GRAPHISTRY_NON_ADJ_WHERE_INEQ_AGG` for aggregated inequality pruning on 2-hop non-adj clauses (experimental).

### Performance
- **Compute / hop**: Added fast-path traversal and domain-based visited/frontier tracking; unified direction handling.

## [0.50.6 - 2026-01-27]
- **GFQL / WHERE**: Use DF-native forward pruning for cuDF equality constraints to avoid host syncs (pandas path unchanged).
- **GFQL / WHERE**: Default non-adjacent WHERE mode now `auto`, enabling value-mode + domain semijoin auto, with edge semijoin auto for edge clauses (opt-out via env).
- **GFQL / WHERE**: Auto mode skips value-mode on multi-clause non-adjacent WHERE when pair estimates exceed the semijoin threshold (guardrail against blowups).
- **GFQL / WHERE**: Avoid building semijoin pair tables when AUTO semijoin stays inactive; uses cheap pair estimates to gate work.
- **GFQL / WHERE**: Reduce semijoin dedup overhead and reuse cached edge pairs per edge when `allowed_edges` is unset.

### Fixed
- **GFQL / hypergraph**: Avoid `DataFrame.style` access when `return_as` yields a DataFrame, preventing Jinja2 import errors in minimal environments without Jinja2 (PR #909).
- **GFQL / chain**: Fixed `from_json` to validate `where` field type before casting, preventing type errors on malformed input.
- **GFQL / WHERE**: Fixed undirected edge handling in WHERE clause filtering to check both src→dst and dst→src directions.
- **GFQL / WHERE**: Fixed multi-hop path edge retention to keep all edges in valid paths, not just terminal edges.
- **GFQL / WHERE**: Fixed unfiltered start node handling with multi-hop edges in native path executor.
- **GFQL / WHERE**: Fixed vector-strategy guard to initialize start/end domains before pair-est gating (prevents UnboundLocalError).

### Infra
- **GFQL / same_path**: Modular architecture for WHERE execution: `same_path_types.py` (types), `df_executor.py` (execution), plus `same_path/` submodules for BFS, edge semantics, multihop, and WHERE filtering.
- **GFQL / WHERE**: Added OTel detail counters for semijoin pair sizes and mid-intersection sizes to help diagnose dense multi-clause blowups.

### Tests
- **GFQL / df_executor**: Added comprehensive test suite (core, amplify, patterns, dimension) with 200+ tests covering Yannakakis semijoin, WHERE clause filtering, multi-hop paths, and pandas/cuDF parity.
- **GFQL / cuDF same-path**: Added strict/auto mode coverage for cuDF executor fallback behavior.
- **Temporal**: Added datetime unit parity coverage (ms/us/ns) for ring layouts, GFQL time ring layouts, and temporal comparison predicates; relaxed honeypot hypergraph datetime unit expectations.

## [0.50.5 - 2026-01-25]
Expand Down
8 changes: 8 additions & 0 deletions docs/pr_notes/pr-886-where.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# PR 886 Notes: GFQL WHERE

## GPU toggles / experiments
- `GRAPHISTRY_CUDF_SAME_PATH_MODE=auto|oracle|strict` controls same-path executor selection when `Engine.CUDF` is requested.

## Commits worth toggling (GPU perf/debug)
- d1e11784 perf(df_executor): DF-native cuDF forward prune
- e85fa8e7 fix(filter_by_dict): allow bool filters on object columns
2 changes: 2 additions & 0 deletions graphistry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@
isna, IsNA,
notna, NotNA,

col, compare,

contains, Contains,
startswith, Startswith,
endswith, Endswith,
Expand Down
69 changes: 16 additions & 53 deletions graphistry/compute/ComputeMixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,35 +46,25 @@ def _safe_len(df: Any) -> int:
Monitor: https://github.com/rapidsai/dask-cuda/issues and https://github.com/rapidsai/cudf/issues
for fixes to groupby aggregation errors on empty DataFrames.
"""
# Check type module without importing dask_cudf (dask imports are slow)
type_module = type(df).__module__
if 'dask_cudf' in type_module:
try:
# Only import if we're reasonably sure it's a dask_cudf DataFrame
import dask_cudf
if isinstance(df, dask_cudf.DataFrame):
# Use map_partitions to get length of each partition, then sum
# This avoids the problematic groupby aggregations that fail on lazy operations
try:
# map_partitions(len) returns scalar per partition, forming a Series
# meta should be pd.Series with appropriate dtype, not bare int
partition_lengths = df.map_partitions(len, meta=pd.Series([], dtype='int64'))
total_length = partition_lengths.sum().compute()
return int(total_length)
except Exception as e:
logger.warning("Could not compute length for dask_cudf DataFrame via map_partitions: %s", e)
# Fallback: try direct compute (may fail on empty DataFrames with lazy ops)
return len(df.compute())
except ImportError as e:
# Unexpected: module name contains 'dask_cudf' but can't import - raise it
logger.error("DataFrame type from dask_cudf module but import failed: %s", e)
raise
except AttributeError as e:
# Unexpected: imported dask_cudf but isinstance/attribute access failed
logger.error("Imported dask_cudf but attribute error occurred: %s", e)
raise

# For all other DataFrame types, use standard len()
return len(df)


Expand Down Expand Up @@ -169,22 +159,33 @@ def materialize_nodes(
if isinstance(engine, str):
engine = EngineAbstract(engine)

g = self
g: Plottable = self

if engine != EngineAbstract.AUTO:
engine_val = Engine(engine.value)
if engine_val == Engine.CUDF:
if g._nodes is not None and isinstance(g._nodes, pd.DataFrame):
import cudf
g = g.nodes(cudf.DataFrame.from_pandas(g._nodes), g._node)
if g._edges is not None and isinstance(g._edges, pd.DataFrame):
import cudf
g = g.edges(cudf.DataFrame.from_pandas(g._edges), g._source, g._destination, edge=g._edge)
elif engine_val == Engine.PANDAS:
if g._nodes is not None and 'cudf' in type(g._nodes).__module__ and 'dask' not in type(g._nodes).__module__:
g = g.nodes(g._nodes.to_pandas(), g._node)
if g._edges is not None and 'cudf' in type(g._edges).__module__ and 'dask' not in type(g._edges).__module__:
g = g.edges(g._edges.to_pandas(), g._source, g._destination, edge=g._edge)

# Check reuse first - if we have nodes and reuse is True, just return
if reuse:
if g._nodes is not None and _safe_len(g._nodes) > 0:
if g._node is None:
logger.warning(
"Must set node id binding, not just nodes; set via .bind() or .nodes()"
)
# raise ValueError('Must set node id binding, not just nodes; set via .bind() or .nodes()')
else:
return g

# Only check for edges if we actually need to materialize
if g._edges is None:
# If no edges but we have nodes via reuse, that's OK
if reuse and g._nodes is not None and _safe_len(g._nodes) > 0:
return g
raise ValueError("Missing edges")
Expand All @@ -194,7 +195,6 @@ def materialize_nodes(
)
if _safe_len(g._edges) == 0:
return g
# TODO use built-ins for igraph/nx/...

node_id = g._node if g._node is not None else "id"
engine_concrete : Engine
Expand Down Expand Up @@ -223,7 +223,6 @@ def raiser(df: Any):
else:
engine_concrete = Engine(engine.value)

# Use engine-specific concat for Series (pd.concat/cudf.concat work with Series directly)
concat_fn = df_concat(engine_concrete)
concat_df = concat_fn([g._edges[g._source], g._edges[g._destination]])
nodes_df = concat_df.rename(node_id).drop_duplicates().to_frame().reset_index(drop=True)
Expand All @@ -234,13 +233,9 @@ def get_indegrees(self, col: str = "degree_in"):
g = self
g_nodes = g.materialize_nodes()

# Handle empty edges case - skip groupby for dask_cudf compatibility
# When edges are empty, all nodes have in-degree of 0
if _safe_len(g._edges) == 0:
if col not in g_nodes._nodes.columns:
# Use assign() for engine compatibility (pandas, cudf, dask, dask_cudf)
nodes_df = g_nodes._nodes.assign(**{col: 0})
# Convert to int32 to match normal degree column dtype
nodes_df = nodes_df.assign(**{col: nodes_df[col].astype("int32")})
else:
nodes_df = g_nodes._nodes.copy()
Expand All @@ -254,7 +249,6 @@ def get_indegrees(self, col: str = "degree_in"):
.rename(columns={g._source: col, g._destination: g_nodes._node})
)

# Use safe_merge for engine type coercion
nodes_subset = g_nodes._nodes[
[c for c in g_nodes._nodes.columns if c != col]
]
Expand Down Expand Up @@ -339,7 +333,6 @@ def keep_nodes(self, nodes):
"""
g = self.materialize_nodes()

#convert to Dict[Str, Union[Series, List-like]]
if isinstance(nodes, dict):
pass
elif isinstance(nodes, np.ndarray) or isinstance(nodes, list):
Expand All @@ -353,41 +346,28 @@ def keep_nodes(self, nodes):
nodes = {g._node: nodes.to_numpy()}
else:
raise ValueError('Unexpected nodes type: {}'.format(type(nodes)))
#convert to Dict[Str, List-like]
#print('nodes mid', nodes)
nodes = {
k: v if isinstance(v, np.ndarray) or isinstance(v, list) else v.to_numpy()
for k, v in nodes.items()
}

#print('self nodes', g._nodes)
#print('pre nodes', nodes)
#print('keys', list(nodes.keys()))
hits = g._nodes[list(nodes.keys())].isin(nodes)
#print('hits', hits)
hits_s = hits[g._node]
for c in hits.columns:
if c != g._node:
hits_s = hits_s & hits[c]
#print('hits_s', hits_s)
new_nodes = g._nodes[hits_s]
#print(new_nodes)
new_node_ids = new_nodes[g._node].to_numpy()
#print('new_node_ids', new_node_ids)
#print('new node_ids', type(new_node_ids), len(g._nodes), '->', len(new_node_ids))
new_edges_hits_df = (
g._edges[[g._source, g._destination]]
.isin({
g._source: new_node_ids,
g._destination: new_node_ids
})
)
#print('new_edges_hits_df', new_edges_hits_df)
new_edges = g._edges[
new_edges_hits_df[g._source] & new_edges_hits_df[g._destination]
]
#print('new_edges', new_edges)
#print('new edges', len(g._edges), '->', len(new_edges))
return g.nodes(new_nodes).edges(new_edges)

def get_topological_levels(
Expand Down Expand Up @@ -436,7 +416,6 @@ def get_topological_levels(
raise ValueError(
"Cyclic graph in get_topological_levels(); remove cycles or set allow_cycles=True"
)
# tie break by picking biggest node
max_degree = g2._nodes["degree"].max()
roots = g2._nodes[g2._nodes["degree"] == max_degree][:1]
if warn_cycles:
Expand All @@ -459,7 +438,6 @@ def get_topological_levels(
g2 = g2.drop_nodes(roots[g2._node])
nodes_df0 = nodes_with_levels[0]
if len(nodes_with_levels) > 1:
# Use engine-aware concat for cuDF/pandas compatibility
engine = resolve_engine(EngineAbstract.AUTO, nodes_df0)
concat_fn = df_concat(engine)
nodes_df = concat_fn([nodes_df0] + nodes_with_levels[1:])
Expand All @@ -469,8 +447,6 @@ def get_topological_levels(
if self._nodes is None:
return self.nodes(nodes_df)
else:
# use orig cols, esp. in case collisions like degree
# Use safe_merge for engine type coercion
levels_df = nodes_df[[g2_base._node, level_col]]
out_df = safe_merge(g2_base._nodes, levels_df, on=g2_base._node, how='left')
return self.nodes(out_df)
Expand Down Expand Up @@ -503,7 +479,6 @@ def collapse(
:returns:A new Graphistry instance with nodes and edges DataFrame containing collapsed nodes and edges given by column attribute -- nodes and edges DataFrames contain six new columns `collapse_{node | edges}` and `final_{node | edges}`, while original (node, src, dst) columns are left untouched
:rtype: Plottable
"""
# TODO FIXME CHECK SELF LOOPS?
return collapse_by(
self,
start_node=node,
Expand Down Expand Up @@ -541,17 +516,7 @@ def chain(self, *args, **kwargs):
stacklevel=2
)
return chain_base(self, *args, **kwargs)
# Preserve original docstring after deprecation notice
chain.__doc__ = (chain.__doc__ or "") + "\n\n" + (chain_base.__doc__ or "")

# chain_let removed from public API - use gfql() instead
# (chain_let_base still available internally for gfql dispatch)

# Commented out to remove from public API - use gfql() instead
# def chain_let(self, *args, **kwargs):
# """Execute a DAG of named graph operations with dependency resolution."""
# return chain_let_base(self, *args, **kwargs)
# chain_let.__doc__ = chain_let_base.__doc__

def gfql(self, *args, **kwargs):
return gfql_base(self, *args, **kwargs)
Expand All @@ -569,7 +534,6 @@ def chain_remote(self, *args, **kwargs) -> Plottable:
stacklevel=2
)
return chain_remote_base(self, *args, **kwargs)
# Preserve original docstring after deprecation notice
chain_remote.__doc__ = (chain_remote.__doc__ or "") + "\n\n" + (chain_remote_base.__doc__ or "")

def chain_remote_shape(self, *args, **kwargs) -> pd.DataFrame:
Expand All @@ -584,7 +548,6 @@ def chain_remote_shape(self, *args, **kwargs) -> pd.DataFrame:
stacklevel=2
)
return chain_remote_shape_base(self, *args, **kwargs)
# Preserve original docstring after deprecation notice
chain_remote_shape.__doc__ = (chain_remote_shape.__doc__ or "") + "\n\n" + (chain_remote_shape_base.__doc__ or "")

def gfql_remote(
Expand Down
3 changes: 3 additions & 0 deletions graphistry/compute/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
isnull, IsNull,
notnull, NotNull,
)
from .gfql.same_path_types import col, compare
from .typing import DataFrameT

__all__ = [
Expand Down Expand Up @@ -94,6 +95,8 @@
'isalnum', 'IsAlnum', 'isdecimal', 'IsDecimal',
'istitle', 'IsTitle', 'isnull', 'IsNull',
'notnull', 'NotNull',
# WHERE helpers
'col', 'compare',
# Types
'DataFrameT'
]
Loading
Loading