Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).


## [1.8.0] - 2026-04-07

### Added

- **Grafeo embedded graph database backend**: New `GrafeoConnection` and `GrafeoConfig`, an in-process graph engine that requires no server, no Docker, and no network. Supports in-memory (`GrafeoConfig.in_memory()`) and file-backed persistent (`GrafeoConfig(path=...)`) storage. Uses GQL/Cypher-compatible queries. Ships as a core dependency (`grafeo>=0.5.33`) like the other databases.

### Documentation

- Installation, quickstart, landing page, and examples 1–3/6–8 updated with Grafeo configuration examples.
- API reference stubs added for `graflo.db.grafeo` module.

## [1.7.12] - 2026-04-06

### Changed
Expand Down
239 changes: 195 additions & 44 deletions graflo/db/grafeo/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def execute(self, query: str, **kwargs: Any) -> Any:
params = kwargs if kwargs else None
return self.db.execute(query, params)

def close(self):
def close(self) -> None:
"""Close the Grafeo database."""
if self.db is not None:
self.db.close()
Expand All @@ -148,7 +148,7 @@ def close(self):
# Database / graph lifecycle
# ------------------------------------------------------------------

def create_database(self, name: str):
def create_database(self, name: str) -> None:
"""Create a new database (no-op for Grafeo).

Grafeo is a single embedded graph with no multi-database concept.
Expand All @@ -158,7 +158,7 @@ def create_database(self, name: str):
"""
logger.debug("create_database('%s') is a no-op for Grafeo", name)

def delete_database(self, name: str):
def delete_database(self, name: str) -> None:
"""Delete a database (no-op for Grafeo).

Grafeo is a single embedded graph with no multi-database concept.
Expand All @@ -168,7 +168,7 @@ def delete_database(self, name: str):
"""
logger.debug("delete_database('%s') is a no-op for Grafeo", name)

def define_schema(self, schema: Schema):
def define_schema(self, schema: Schema) -> None:
"""Define vertex and edge classes based on schema.

Note: This is a no-op in Grafeo as labels and relationship types
Expand Down Expand Up @@ -205,7 +205,13 @@ def delete_graph_structure(
graph_names: tuple[str, ...] | list[str] = (),
delete_all: bool = False,
) -> None:
"""Delete nodes and relationships from the Grafeo graph."""
"""Delete nodes and relationships from the Grafeo graph.

Args:
vertex_types: Label names to delete nodes for.
graph_names: Unused in Grafeo (single embedded graph).
delete_all: If True, delete all nodes and relationships.
"""
if delete_all:
try:
self.execute("MATCH (n) DETACH DELETE n")
Expand Down Expand Up @@ -252,7 +258,13 @@ def init_db(self, schema: Schema, recreate_schema: bool) -> None:
logger.info("Grafeo database initialized")

def clear_data(self, schema: Schema) -> None:
"""Remove all data without dropping the schema."""
"""Remove all data without dropping the schema.

Deletes nodes and relationships for schema-managed labels only.

Args:
schema: Schema containing vertex and edge class definitions.
"""
vc = schema.resolve_db_aware(DBType.GRAFEO).vertex_config
vertex_types = tuple(vc.vertex_dbname(v) for v in vc.vertex_set)
if vertex_types:
Expand All @@ -264,8 +276,18 @@ def clear_data(self, schema: Schema) -> None:

def define_vertex_indexes(
self, vertex_config: VertexConfig, schema: Schema | None = None
):
"""Create property indexes for vertex labels."""
) -> None:
"""Create property indexes for vertex labels.

Creates indexes for each vertex label based on the configuration.
Grafeo supports property indexes on node labels. Identity index is
prepended when schema is provided.

Args:
vertex_config: Vertex configuration containing index definitions.
schema: Optional schema for resolving DB-aware vertex names and
identity fields. If ``None``, no indexes are created.
"""
if schema is None:
return
db_vc = schema.resolve_db_aware(DBType.GRAFEO).vertex_config
Expand All @@ -281,8 +303,17 @@ def define_vertex_indexes(
for index_obj in index_list:
self._add_index(label, index_obj)

def define_edge_indexes(self, edges: list[Edge], schema: Schema | None = None):
"""Create property indexes for relationship types."""
def define_edge_indexes(self, edges: list[Edge], schema: Schema | None = None) -> None:
"""Create property indexes for relationship types.

Creates indexes for each relationship type based on the configuration.
Grafeo supports property indexes on relationship types.

Args:
edges: List of edge configurations containing index definitions.
schema: Optional schema for resolving secondary edge indexes.
If ``None``, no indexes are created.
"""
if schema is None:
return
for edge in edges:
Expand Down Expand Up @@ -383,7 +414,15 @@ def upsert_docs_batch(
def insert_return_batch(
self, docs: list[dict[str, Any]], class_name: str
) -> list[dict[str, Any]]:
"""Insert nodes and return their properties."""
"""Insert nodes and return their properties.

Args:
docs: Documents to insert.
class_name: Label to insert into.

Returns:
List of inserted node properties as dictionaries.
"""
results = []
for doc in docs:
sanitized = self._sanitize_doc(doc)
Expand Down Expand Up @@ -416,6 +455,20 @@ def insert_edges_batch(
"""Create relationships between existing nodes using MERGE.

Each element of *docs_edges* is ``[source_dict, target_dict, props_dict]``.

Args:
docs_edges: Edge specifications as list of
``[source, target, props]`` triples.
source_class: Label of source nodes.
target_class: Label of target nodes.
relation_name: Relationship type name.
match_keys_source: Properties to match source nodes.
match_keys_target: Properties to match target nodes.
filter_uniques: Unused in Grafeo (MERGE handles uniqueness
automatically).
head: Optional limit on number of relationships to insert.
**kwargs: See :meth:`graflo.db.conn.Connection.insert_edges_batch` /
:func:`graflo.db.conn.consume_insert_edges_kwargs`.
"""
opts = consume_insert_edges_kwargs(kwargs)
dry = opts.dry
Expand All @@ -435,39 +488,74 @@ def insert_edges_batch(
logger.warning("Skipping invalid edge format: %s", edge)
continue

src_raw, tgt_raw, props_raw = edge
src = self._sanitize_doc(
src_raw if isinstance(src_raw, dict) else {},
list(match_keys_source),
)
tgt = self._sanitize_doc(
tgt_raw if isinstance(tgt_raw, dict) else {},
list(match_keys_target),
)
props = self._sanitize_doc(
props_raw if isinstance(props_raw, dict) else {}
query = self._build_edge_merge_query(
edge,
source_class,
target_class,
relation_name,
match_keys_source,
match_keys_target,
merge_props,
)
if not dry:
self.execute(query)

src_match = _match_clause(match_keys_source, src)
tgt_match = _match_clause(match_keys_target, tgt)
def _build_edge_merge_query(
self,
edge: list[dict[str, Any]],
source_class: str,
target_class: str,
relation_name: str,
match_keys_source: tuple[str, ...],
match_keys_target: tuple[str, ...],
merge_props: tuple[str, ...] | None,
) -> str:
"""Build a GQL MERGE query for a single edge triple.

if merge_props:
merge_map = ", ".join(
f"`{p}`: {_gql_literal(props.get(p))}" for p in merge_props
)
rel_pattern = f"[r:`{relation_name}` {{{merge_map}}}]"
else:
rel_pattern = f"[r:`{relation_name}`]"
Args:
edge: A ``[source_dict, target_dict, props_dict]`` triple.
source_class: Label of the source node.
target_class: Label of the target node.
relation_name: Relationship type name.
match_keys_source: Properties to match source nodes.
match_keys_target: Properties to match target nodes.
merge_props: Optional property names included in the MERGE
pattern so parallel edges are kept distinct.

props_literal = _props_map(props)
query = (
f"MATCH (a:`{source_class}` {{{src_match}}}), "
f"(b:`{target_class}` {{{tgt_match}}})\n"
f"MERGE (a)-{rel_pattern}->(b)\n"
f"SET r += {props_literal}"
Returns:
GQL query string for the MERGE operation.
"""
src_raw, tgt_raw, props_raw = edge
src = self._sanitize_doc(
src_raw if isinstance(src_raw, dict) else {},
list(match_keys_source),
)
tgt = self._sanitize_doc(
tgt_raw if isinstance(tgt_raw, dict) else {},
list(match_keys_target),
)
props = self._sanitize_doc(
props_raw if isinstance(props_raw, dict) else {}
)

src_match = _match_clause(match_keys_source, src)
tgt_match = _match_clause(match_keys_target, tgt)

if merge_props:
merge_map = ", ".join(
f"`{p}`: {_gql_literal(props.get(p))}" for p in merge_props
)
if not dry:
self.execute(query)
rel_pattern = f"[r:`{relation_name}` {{{merge_map}}}]"
else:
rel_pattern = f"[r:`{relation_name}`]"

props_literal = _props_map(props)
return (
f"MATCH (a:`{source_class}` {{{src_match}}}), "
f"(b:`{target_class}` {{{tgt_match}}})\n"
f"MERGE (a)-{rel_pattern}->(b)\n"
f"SET r += {props_literal}"
)

# ------------------------------------------------------------------
# Fetch operations
Expand All @@ -482,7 +570,19 @@ def fetch_docs(
unset_keys: list[str] | None = None,
**kwargs: Any,
) -> list[dict[str, Any]]:
"""Fetch nodes from a label."""
"""Fetch nodes from a label.

Args:
class_name: Label to fetch from.
filters: Query filters.
limit: Maximum number of nodes to return.
return_keys: Keys to return (projection).
unset_keys: Unused in Grafeo.
**kwargs: Additional parameters.

Returns:
List of fetched nodes as dictionaries.
"""
if filters is not None:
ff = FilterExpression.from_dict(filters)
filter_clause = f"WHERE {ff(doc_name='n', kind=self.expression_flavor())}"
Expand Down Expand Up @@ -527,7 +627,23 @@ def fetch_edges(
unset_keys: list[str] | None = None,
**kwargs: Any,
) -> list[dict[str, Any]]:
"""Fetch edges from the Grafeo graph."""
"""Fetch edges from the Grafeo graph.

Args:
from_type: Source node label.
from_id: Source node ID.
edge_type: Optional relationship type to filter by.
to_type: Optional target node label to filter by.
to_id: Optional target node ID to filter by.
filters: Additional query filters.
limit: Maximum number of edges to return.
return_keys: Keys to return (projection).
unset_keys: Unused in Grafeo.
**kwargs: Additional parameters.

Returns:
List of fetched edges as dictionaries.
"""
source = f"(source:`{from_type}` {{id: {_gql_literal(from_id)}}})"
rel = f"-[r:`{edge_type}`]->" if edge_type else "-[r]->"
target = f"(target:`{to_type}`)" if to_type else "(target)"
Expand Down Expand Up @@ -580,7 +696,19 @@ def fetch_present_documents(
flatten: bool = False,
filters: list[Any] | dict[str, Any] | None = None,
) -> list[dict[str, Any]]:
"""Return documents from *batch* that already exist in the graph."""
"""Return documents from *batch* that already exist in the graph.

Args:
batch: Batch of documents to check.
class_name: Label to check in.
match_keys: Keys to match nodes.
keep_keys: Keys to keep in result.
flatten: Unused in Grafeo.
filters: Additional query filters.

Returns:
Documents that exist in the database.
"""
if not batch:
return []

Expand Down Expand Up @@ -613,7 +741,18 @@ def keep_absent_documents(
keep_keys: list[str] | tuple[str, ...] | None = None,
filters: list[Any] | dict[str, Any] | None = None,
) -> list[dict[str, Any]]:
"""Return documents from *batch* that do NOT exist in the graph."""
"""Return documents from *batch* that do NOT exist in the graph.

Args:
batch: Batch of documents to check.
class_name: Label to check in.
match_keys: Keys to match nodes.
keep_keys: Keys to keep in result.
filters: Additional query filters.

Returns:
Documents that don't exist in the database.
"""
if not batch:
return []

Expand Down Expand Up @@ -646,7 +785,19 @@ def aggregate(
aggregated_field: str | None = None,
filters: list[Any] | dict[str, Any] | None = None,
) -> int | float | list[dict[str, Any]] | dict[str, int | float] | None:
"""Perform aggregation on a label."""
"""Perform aggregation on a label.

Args:
class_name: Label to aggregate.
aggregation_function: Type of aggregation to perform.
discriminant: Field to group by.
aggregated_field: Field to aggregate.
filters: Query filters.

Returns:
Aggregation results (dict for grouped aggregations,
int/float for single value, list for sorted unique).
"""
if filters is not None:
ff = FilterExpression.from_dict(filters)
filter_clause = f"WHERE {ff(doc_name='n', kind=self.expression_flavor())}"
Expand Down