Skip to content

Commit d771742

Browse files
fix: improve pipeline create_from (#1158)
Improves Query to Pipeline logic: - add support for `cursor` and `limit_to_last` - apply fewer `exists` clauses, so pipelines give the same results as matching RunQuery statements
1 parent cdec6a4 commit d771742

File tree

7 files changed

+387
-63
lines changed

7 files changed

+387
-63
lines changed

packages/google-cloud-firestore/google/cloud/firestore_v1/base_aggregation.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -361,12 +361,8 @@ def _build_pipeline(self, source: "PipelineSource"):
361361
"""
362362
Convert this query into a Pipeline
363363
364-
Queries containing a `cursor` or `limit_to_last` are not currently supported
365-
366364
Args:
367365
source: the PipelineSource to build the pipeline off of
368-
Raises:
369-
- NotImplementedError: raised if the query contains a `cursor` or `limit_to_last`
370366
Returns:
371367
a Pipeline representing the query
372368
"""

packages/google-cloud-firestore/google/cloud/firestore_v1/base_collection.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -608,12 +608,8 @@ def _build_pipeline(self, source: "PipelineSource"):
608608
"""
609609
Convert this query into a Pipeline
610610
611-
Queries containing a `cursor` or `limit_to_last` are not currently supported
612-
613611
Args:
614612
source: the PipelineSource to build the pipeline off o
615-
Raises:
616-
- NotImplementedError: raised if the query contains a `cursor` or `limit_to_last`
617613
Returns:
618614
a Pipeline representing the query
619615
"""

packages/google-cloud-firestore/google/cloud/firestore_v1/base_query.py

Lines changed: 136 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1134,12 +1134,8 @@ def _build_pipeline(self, source: "PipelineSource"):
11341134
"""
11351135
Convert this query into a Pipeline
11361136
1137-
Queries containing a `cursor` or `limit_to_last` are not currently supported
1138-
11391137
Args:
11401138
source: the PipelineSource to build the pipeline off of
1141-
Raises:
1142-
- NotImplementedError: raised if the query contains a `cursor` or `limit_to_last`
11431139
Returns:
11441140
a Pipeline representing the query
11451141
"""
@@ -1161,39 +1157,61 @@ def _build_pipeline(self, source: "PipelineSource"):
11611157
ppl = ppl.select(*[field.field_path for field in self._projection.fields])
11621158

11631159
# Orders
1164-
orders = self._normalize_orders()
1165-
if orders:
1166-
exists = []
1167-
orderings = []
1168-
for order in orders:
1169-
field = pipeline_expressions.Field.of(order.field.field_path)
1170-
exists.append(field.exists())
1171-
direction = (
1172-
"ascending"
1173-
if order.direction == StructuredQuery.Direction.ASCENDING
1174-
else "descending"
1175-
)
1176-
orderings.append(pipeline_expressions.Ordering(field, direction))
11771160

1178-
# Add exists filters to match Query's implicit orderby semantics.
1179-
if len(exists) == 1:
1180-
ppl = ppl.where(exists[0])
1181-
else:
1182-
ppl = ppl.where(pipeline_expressions.And(*exists))
1161+
# "explicit_orders" are only those explicitly added by the user via order_by().
1162+
# We only generate existence filters for these fields.
1163+
if self._orders:
1164+
exists = [
1165+
pipeline_expressions.Field.of(o.field.field_path).exists()
1166+
for o in self._orders
1167+
]
1168+
ppl = ppl.where(
1169+
pipeline_expressions.And(*exists) if len(exists) > 1 else exists[0]
1170+
)
1171+
1172+
# "normalized_orders" includes both user-specified orders and implicit orders
1173+
# (e.g. __name__ or inequality fields) required by Firestore semantics.
1174+
normalized_orders = self._normalize_orders()
1175+
orderings = [
1176+
pipeline_expressions.Ordering(
1177+
pipeline_expressions.Field.of(o.field.field_path),
1178+
"ascending"
1179+
if o.direction == StructuredQuery.Direction.ASCENDING
1180+
else "descending",
1181+
)
1182+
for o in normalized_orders
1183+
]
1184+
1185+
# Apply cursors as filters.
1186+
if orderings:
1187+
for cursor, is_start in [(self._start_at, True), (self._end_at, False)]:
1188+
cursor = self._normalize_cursor(cursor, normalized_orders)
1189+
if cursor:
1190+
ppl = ppl.where(
1191+
_where_conditions_from_cursor(cursor, orderings, is_start)
1192+
)
1193+
1194+
# Handle sort and limit, including limit_to_last semantics.
1195+
is_limit_to_last = self._limit_to_last and bool(orderings)
11831196

1184-
# Add sort orderings
1197+
if is_limit_to_last:
1198+
# If limit_to_last is set, we need to reverse the orderings to find the
1199+
# "last" N documents (which effectively become the "first" N in reverse order).
1200+
ppl = ppl.sort(*_reverse_orderings(orderings))
1201+
elif orderings:
11851202
ppl = ppl.sort(*orderings)
11861203

1187-
# Cursors, Limit and Offset
1188-
if self._start_at or self._end_at or self._limit_to_last:
1189-
raise NotImplementedError(
1190-
"Query to Pipeline conversion: cursors and limit_to_last is not supported yet."
1191-
)
1192-
else: # Limit & Offset without cursors
1193-
if self._offset:
1194-
ppl = ppl.offset(self._offset)
1195-
if self._limit:
1196-
ppl = ppl.limit(self._limit)
1204+
if self._limit is not None and (not self._limit_to_last or orderings):
1205+
ppl = ppl.limit(self._limit)
1206+
1207+
if is_limit_to_last:
1208+
# If we reversed the orderings for limit_to_last, we must now re-sort
1209+
# using the original orderings to return the results in the user-requested order.
1210+
ppl = ppl.sort(*orderings)
1211+
1212+
# Offset
1213+
if self._offset:
1214+
ppl = ppl.offset(self._offset)
11971215

11981216
return ppl
11991217

@@ -1366,6 +1384,91 @@ def _cursor_pb(cursor_pair: Optional[Tuple[list, bool]]) -> Optional[Cursor]:
13661384
return None
13671385

13681386

1387+
def _get_cursor_exclusive_condition(
1388+
is_start_cursor: bool,
1389+
ordering: pipeline_expressions.Ordering,
1390+
value: pipeline_expressions.Constant,
1391+
) -> pipeline_expressions.BooleanExpression:
1392+
"""
1393+
Helper to determine the correct comparison operator (greater_than or less_than)
1394+
based on the cursor type (start/end) and the sort direction (ascending/descending).
1395+
"""
1396+
field = ordering.expr
1397+
if (
1398+
is_start_cursor
1399+
and ordering.order_dir == pipeline_expressions.Ordering.Direction.ASCENDING
1400+
) or (
1401+
not is_start_cursor
1402+
and ordering.order_dir == pipeline_expressions.Ordering.Direction.DESCENDING
1403+
):
1404+
return field.greater_than(value)
1405+
else:
1406+
return field.less_than(value)
1407+
1408+
1409+
def _where_conditions_from_cursor(
1410+
cursor: Tuple[List, bool],
1411+
orderings: List[pipeline_expressions.Ordering],
1412+
is_start_cursor: bool,
1413+
) -> pipeline_expressions.BooleanExpression:
1414+
"""
1415+
Converts a cursor into a filter condition for the pipeline.
1416+
1417+
Args:
1418+
cursor: The cursor values and the 'before' flag.
1419+
orderings: The list of ordering expressions used in the query.
1420+
is_start_cursor: True if this is a start_at/start_after cursor, False if it is an end_at/end_before cursor.
1421+
Returns:
1422+
A BooleanExpression representing the cursor condition.
1423+
"""
1424+
cursor_values, before = cursor
1425+
size = len(cursor_values)
1426+
1427+
ordering = orderings[size - 1]
1428+
field = ordering.expr
1429+
value = pipeline_expressions.Constant(cursor_values[size - 1])
1430+
1431+
# Add condition for last bound
1432+
condition = _get_cursor_exclusive_condition(is_start_cursor, ordering, value)
1433+
1434+
if (is_start_cursor and before) or (not is_start_cursor and not before):
1435+
# When the cursor bound is inclusive, then the last bound
1436+
# can be equal to the value, otherwise it's not equal
1437+
condition = pipeline_expressions.Or(condition, field.equal(value))
1438+
1439+
# Iterate backwards over the remaining bounds, adding a condition for each one
1440+
for i in range(size - 2, -1, -1):
1441+
ordering = orderings[i]
1442+
field = ordering.expr
1443+
value = pipeline_expressions.Constant(cursor_values[i])
1444+
1445+
# For each field in the orderings, the condition is either
1446+
# a) lessThan|greaterThan the cursor value,
1447+
# b) or equal the cursor value and lessThan|greaterThan the cursor values for other fields
1448+
exclusive_condition = _get_cursor_exclusive_condition(
1449+
is_start_cursor, ordering, value
1450+
)
1451+
condition = pipeline_expressions.Or(
1452+
exclusive_condition,
1453+
pipeline_expressions.And(field.equal(value), condition),
1454+
)
1455+
1456+
return condition
1457+
1458+
1459+
def _reverse_orderings(
1460+
orderings: List[pipeline_expressions.Ordering],
1461+
) -> List[pipeline_expressions.Ordering]:
1462+
reversed_orderings = []
1463+
for o in orderings:
1464+
if o.order_dir == pipeline_expressions.Ordering.Direction.ASCENDING:
1465+
new_dir = "descending"
1466+
else:
1467+
new_dir = "ascending"
1468+
reversed_orderings.append(pipeline_expressions.Ordering(o.expr, new_dir))
1469+
return reversed_orderings
1470+
1471+
13691472
def _query_response_to_snapshot(
13701473
response_pb: RunQueryResponse, collection, expected_prefix: str
13711474
) -> Optional[document.DocumentSnapshot]:

packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1833,15 +1833,21 @@ def _from_query_filter_pb(filter_pb, client):
18331833
elif filter_pb.op == Query_pb.FieldFilter.Operator.EQUAL:
18341834
return And(field.exists(), field.equal(value))
18351835
elif filter_pb.op == Query_pb.FieldFilter.Operator.NOT_EQUAL:
1836-
return And(field.exists(), field.not_equal(value))
1836+
# In Enterprise DBs NOT_EQUAL will match a field that does not exist,
1837+
# therefore we do not want an existence filter for the NOT_EQUAL conversion
1838+
# so the Query and Pipeline behavior are consistent in Enterprise.
1839+
return field.not_equal(value)
18371840
if filter_pb.op == Query_pb.FieldFilter.Operator.ARRAY_CONTAINS:
18381841
return And(field.exists(), field.array_contains(value))
18391842
elif filter_pb.op == Query_pb.FieldFilter.Operator.ARRAY_CONTAINS_ANY:
18401843
return And(field.exists(), field.array_contains_any(value))
18411844
elif filter_pb.op == Query_pb.FieldFilter.Operator.IN:
18421845
return And(field.exists(), field.equal_any(value))
18431846
elif filter_pb.op == Query_pb.FieldFilter.Operator.NOT_IN:
1844-
return And(field.exists(), field.not_equal_any(value))
1847+
# In Enterprise DBs NOT_IN will match a field that does not exist,
1848+
# therefore we do not want an existence filter for the NOT_IN conversion
1849+
# so the Query and Pipeline behavior are consistent in Enterprise.
1850+
return field.not_equal_any(value)
18451851
else:
18461852
raise TypeError(f"Unexpected FieldFilter operator type: {filter_pb.op}")
18471853
elif isinstance(filter_pb, Query_pb.Filter):

packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,8 @@ def create_from(
5757
"""
5858
Create a pipeline from an existing query
5959
60-
Queries containing a `cursor` or `limit_to_last` are not currently supported
61-
6260
Args:
6361
query: the query to build the pipeline off of
64-
Raises:
65-
- NotImplementedError: raised if the query contains a `cursor` or `limit_to_last`
6662
Returns:
6763
a new pipeline instance representing the query
6864
"""

0 commit comments

Comments
 (0)