Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from __future__ import annotations

from typing import TYPE_CHECKING, Generic, TypeVar
from typing import Generic, TypeVar, TYPE_CHECKING

from google.cloud.firestore_v1 import pipeline_stages as stages
from google.cloud.firestore_v1._helpers import DOCUMENT_PATH_DELIMITER
Expand All @@ -32,6 +32,7 @@
from google.cloud.firestore_v1.base_document import BaseDocumentReference
from google.cloud.firestore_v1.base_query import BaseQuery
from google.cloud.firestore_v1.client import Client
from google.cloud.firestore_v1.pipeline_expressions import CONSTANT_TYPE, Expression


PipelineType = TypeVar("PipelineType", bound=_BasePipeline)
Expand Down Expand Up @@ -108,3 +109,65 @@ def documents(self, *docs: "BaseDocumentReference") -> PipelineType:
a new pipeline instance targeting the specified documents
"""
return self._create_pipeline(stages.Documents.of(*docs))

def literals(
self, *documents: dict[str, Expression | CONSTANT_TYPE]
) -> PipelineType:
"""
Returns documents from a fixed set of predefined document objects.

Example:
>>> from google.cloud.firestore_v1.pipeline_expressions import Constant
>>> documents = [
... {"name": "joe", "age": 10},
... {"name": "bob", "age": 30},
... {"name": "alice", "age": 40}
... ]
>>> pipeline = client.pipeline()
... .literals(*documents)
... .where(field("age").lessThan(35))

Output documents:
```json
[
{"name": "joe", "age": 10},
{"name": "bob", "age": 30}
]
```

Behavior:
The `literals(...)` stage can only be used as the first stage in a pipeline (or
sub-pipeline). The order of documents returned from the `literals` matches the
order in which they are defined.

While literal values are the most common, it is also possible to pass in
expressions, which will be evaluated and returned, making it possible to test
out different query / expression behavior without first needing to create some
test data.

For example, the following shows how to quickly test out the `length(...)`
function on some constant test sets:

Example:
>>> from google.cloud.firestore_v1.pipeline_expressions import Constant
>>> documents = [
... {"x": Constant.of("foo-bar-baz").char_length()},
... {"x": Constant.of("bar").char_length()}
... ]
>>> pipeline = client.pipeline().literals(*documents)

Output documents:
```json
[
{"x": 11},
{"x": 3}
]
```

Args:
*documents: One or more documents to be returned by this stage. Each can be a `dict`
of values of `Expression` or `CONSTANT_TYPE` types.
Returns:
A new Pipeline object with this stage appended to the stage list.
"""
return self._create_pipeline(stages.Literals(*documents))
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@

from abc import ABC, abstractmethod
from enum import Enum
from typing import TYPE_CHECKING, Optional, Sequence
from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence

from google.cloud.firestore_v1._helpers import encode_value
from google.cloud.firestore_v1.base_vector_query import DistanceMeasure
from google.cloud.firestore_v1.pipeline_expressions import (
AggregateFunction,
AliasedExpression,
BooleanExpression,
CONSTANT_TYPE,
Expression,
Field,
Ordering,
Expand Down Expand Up @@ -342,6 +343,26 @@ def _pb_args(self):
return [Value(integer_value=self.limit)]


class Literals(Stage):
"""Returns documents from a fixed set of predefined document objects."""

def __init__(self, *documents: Mapping[str, Expression | CONSTANT_TYPE]):
super().__init__("literals")
self.documents: tuple[Mapping[str, Any], ...] = documents

def _pb_args(self):
args = []
for doc in self.documents:
encoded_doc = {}
for k, v in doc.items():
if hasattr(v, "_to_pb"):
encoded_doc[k] = v._to_pb()
else:
encoded_doc[k] = encode_value(v)
args.append(Value(map_value={"fields": encoded_doc}))
return args


class Offset(Stage):
"""Skips a specified number of documents."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,4 +684,63 @@ tests:
- args:
- fieldReferenceValue: awards
- stringValue: full_replace
name: replace_with
name: replace_with
- description: literals
pipeline:
- Literals:
- title: "The Hitchhiker's Guide to the Galaxy"
author: "Douglas Adams"
- genre: "Science Fiction"
year: 1979
assert_results:
- title: "The Hitchhiker's Guide to the Galaxy"
author: "Douglas Adams"
- genre: "Science Fiction"
year: 1979
assert_proto:
pipeline:
stages:
- args:
- mapValue:
fields:
author:
stringValue: "Douglas Adams"
title:
stringValue: "The Hitchhiker's Guide to the Galaxy"
- mapValue:
fields:
genre:
stringValue: "Science Fiction"
year:
integerValue: '1979'
name: literals
- description: literals_with_expression_input
pipeline:
- Literals:
- res:
FunctionExpression.string_concat:
- Constant: "A"
- Constant: "B"
- Select:
- res
assert_results:
- res: "AB"
assert_proto:
pipeline:
stages:
- args:
- mapValue:
fields:
res:
functionValue:
args:
- stringValue: "A"
- stringValue: "B"
name: string_concat
name: literals
- args:
- mapValue:
fields:
res:
fieldReferenceValue: res
name: select
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,17 @@ def test_documents(self):
assert first_stage.paths[1] == "/a/2"
assert first_stage.paths[2] == "/a/3"

def test_literals(self):
from google.cloud.firestore_v1.pipeline_expressions import Field

instance = self._make_client().pipeline()
documents = ({"field": Field.of("a")}, {"name": "joe"})
ppl = instance.literals(*documents)
assert isinstance(ppl, self._expected_pipeline_type)
assert len(ppl.stages) == 1
first_stage = ppl.stages[0]
assert isinstance(first_stage, stages.Literals)


class TestPipelineSourceWithAsyncClient(TestPipelineSource):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,112 @@ def test_to_pb(self):
assert len(result.options) == 0


class TestLiterals:
def _make_one(self, *args, **kwargs):
return stages.Literals(*args, **kwargs)

def test_ctor(self):
val1 = {"a": Field.of("a")}
val2 = {"b": 2}
instance = self._make_one(val1, val2)
assert instance.documents == (val1, val2)
assert instance.name == "literals"

def test_ctor_extended_types(self):
import datetime
from google.cloud.firestore_v1._helpers import GeoPoint
from google.cloud.firestore_v1.vector import Vector

doc = {
"a": 1,
"b": "string",
"c": 3.14,
"d": True,
"e": None,
"f": b"bytes",
"g": datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc),
"h": GeoPoint(1.0, 2.0),
"i": Vector([1.0, 2.0]),
}
instance = self._make_one(doc)
assert instance.documents == (doc,)
assert instance.name == "literals"

def test_ctor_w_expressions(self):
from google.cloud.firestore_v1.pipeline_expressions import FunctionExpression

expr = FunctionExpression("string_concat", [Constant("A"), Constant("B")])
doc = {"res": expr}
instance = self._make_one(doc)
assert instance.documents == (doc,)
assert instance.name == "literals"

def test_repr(self):
from google.cloud.firestore_v1.pipeline_expressions import Field

val1 = {"a": Field.of("a")}
instance = self._make_one(val1, {"b": 2})
repr_str = repr(instance)
assert repr_str == "Literals(documents=({'a': Field.of('a')}, {'b': 2}))"

def test_to_pb_constant_types(self):
import datetime
from google.cloud.firestore_v1._helpers import GeoPoint
from google.cloud.firestore_v1.vector import Vector

doc = {
"a": 1,
"b": "string",
"c": 3.14,
"d": True,
"e": None,
"f": b"bytes",
"g": datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc),
"h": GeoPoint(1.0, 2.0),
"i": Vector([1.0, 2.0]),
}
instance = self._make_one(doc)
result = instance._to_pb()
assert result.name == "literals"
assert len(result.args) == 1

fields = result.args[0].map_value.fields
assert fields["a"].integer_value == 1
assert fields["b"].string_value == "string"
assert fields["c"].double_value == 3.14
assert fields["d"].boolean_value is True
assert fields["e"].null_value == 0
assert fields["f"].bytes_value == b"bytes"
assert fields["g"].timestamp_value == datetime.datetime(
2025, 1, 1, tzinfo=datetime.timezone.utc
)
assert fields["h"].geo_point_value.latitude == 1.0
assert fields["h"].geo_point_value.longitude == 2.0
assert (
fields["i"].map_value.fields["value"].array_value.values[0].double_value
== 1.0
)
assert (
fields["i"].map_value.fields["value"].array_value.values[1].double_value
== 2.0
)

def test_to_pb_w_expression(self):
from google.cloud.firestore_v1.pipeline_expressions import FunctionExpression

expr = FunctionExpression("string_concat", [Constant("A"), Constant("B")])
doc = {"res": expr}
instance = self._make_one(doc)
result = instance._to_pb()
assert result.name == "literals"
assert len(result.args) == 1

fields = result.args[0].map_value.fields
assert fields["res"].function_value.name == "string_concat"
assert fields["res"].function_value.args[0].string_value == "A"
assert fields["res"].function_value.args[1].string_value == "B"


class TestOffset:
def _make_one(self, *args, **kwargs):
return stages.Offset(*args, **kwargs)
Expand Down
Loading