Skip to content

Commit 4ccfac6

Browse files
committed
Pipeline utils
1 parent 350843c commit 4ccfac6

22 files changed

+1052
-742
lines changed

firebase-firestore/src/main/java/com/google/firebase/firestore/Pipeline.kt

Lines changed: 101 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ package com.google.firebase.firestore
1717
import com.google.android.gms.tasks.Task
1818
import com.google.android.gms.tasks.TaskCompletionSource
1919
import com.google.firebase.Timestamp
20+
import com.google.firebase.firestore.core.Canonicalizable
21+
import com.google.firebase.firestore.model.Document
2022
import com.google.firebase.firestore.model.DocumentKey
23+
import com.google.firebase.firestore.model.MutableDocument
2124
import com.google.firebase.firestore.model.Values
2225
import com.google.firebase.firestore.pipeline.AddFieldsStage
2326
import com.google.firebase.firestore.pipeline.AggregateFunction
@@ -29,6 +32,7 @@ import com.google.firebase.firestore.pipeline.CollectionSource
2932
import com.google.firebase.firestore.pipeline.DatabaseSource
3033
import com.google.firebase.firestore.pipeline.DistinctStage
3134
import com.google.firebase.firestore.pipeline.DocumentsSource
35+
import com.google.firebase.firestore.pipeline.EvaluationContext
3236
import com.google.firebase.firestore.pipeline.Expr
3337
import com.google.firebase.firestore.pipeline.Expr.Companion.field
3438
import com.google.firebase.firestore.pipeline.ExprWithAlias
@@ -41,7 +45,6 @@ import com.google.firebase.firestore.pipeline.OffsetStage
4145
import com.google.firebase.firestore.pipeline.Ordering
4246
import com.google.firebase.firestore.pipeline.PipelineOptions
4347
import com.google.firebase.firestore.pipeline.RawStage
44-
import com.google.firebase.firestore.pipeline.RealtimePipelineOptions
4548
import com.google.firebase.firestore.pipeline.RemoveFieldsStage
4649
import com.google.firebase.firestore.pipeline.ReplaceStage
4750
import com.google.firebase.firestore.pipeline.SampleStage
@@ -52,6 +55,7 @@ import com.google.firebase.firestore.pipeline.Stage
5255
import com.google.firebase.firestore.pipeline.UnionStage
5356
import com.google.firebase.firestore.pipeline.UnnestStage
5457
import com.google.firebase.firestore.pipeline.WhereStage
58+
import com.google.firebase.firestore.util.Assert.fail
5559
import com.google.firestore.v1.ExecutePipelineRequest
5660
import com.google.firestore.v1.StructuredPipeline
5761
import com.google.firestore.v1.Value
@@ -760,7 +764,7 @@ internal constructor(
760764
firestore: FirebaseFirestore,
761765
userDataReader: UserDataReader,
762766
stages: List<Stage<*>>
763-
) : AbstractPipeline(firestore, userDataReader, stages) {
767+
) : AbstractPipeline(firestore, userDataReader, stages), Canonicalizable {
764768
internal constructor(
765769
firestore: FirebaseFirestore,
766770
userDataReader: UserDataReader,
@@ -787,31 +791,107 @@ internal constructor(
787791

788792
fun where(condition: BooleanExpr): RealtimePipeline = append(WhereStage(condition))
789793

790-
internal fun rewriteStages(): RealtimePipeline {
794+
internal val rewrittenStages: List<Stage<*>> by lazy {
791795
var hasOrder = false
792-
return with(
793-
buildList {
794-
for (stage in stages) when (stage) {
795-
// Stages whose semantics depend on ordering
796-
is LimitStage,
797-
is OffsetStage -> {
798-
if (!hasOrder) {
799-
hasOrder = true
800-
add(SortStage.BY_DOCUMENT_ID)
801-
}
802-
add(stage)
803-
}
804-
is SortStage -> {
796+
buildList {
797+
for (stage in stages) when (stage) {
798+
// Stages whose semantics depend on ordering
799+
is LimitStage,
800+
is OffsetStage -> {
801+
if (!hasOrder) {
805802
hasOrder = true
806-
add(stage.withStableOrdering())
803+
add(SortStage.BY_DOCUMENT_ID)
807804
}
808-
else -> add(stage)
805+
add(stage)
809806
}
810-
if (!hasOrder) {
811-
add(SortStage.BY_DOCUMENT_ID)
807+
is SortStage -> {
808+
hasOrder = true
809+
add(stage.withStableOrdering())
812810
}
811+
else -> add(stage)
813812
}
814-
)
813+
if (!hasOrder) {
814+
add(SortStage.BY_DOCUMENT_ID)
815+
}
816+
}
817+
}
818+
819+
override fun canonicalId(): String {
820+
return rewrittenStages.joinToString("|") { stage -> (stage as Canonicalizable).canonicalId() }
821+
}
822+
823+
override fun equals(other: Any?): Boolean {
824+
if (this === other) return true
825+
if (other !is RealtimePipeline) return false
826+
return stages == other.stages
827+
}
828+
829+
override fun hashCode(): Int {
830+
return stages.hashCode()
831+
}
832+
833+
internal fun evaluate(inputs: List<MutableDocument>): List<MutableDocument> {
834+
val context = EvaluationContext(this)
835+
return rewrittenStages.fold(inputs) { documents, stage -> stage.evaluate(context, documents) }
836+
}
837+
838+
internal fun matchesAllDocuments(): Boolean {
839+
for (stage in rewrittenStages) {
840+
// Check for LimitStage
841+
if (stage.name == "limit") {
842+
return false
843+
}
844+
845+
// Check for Where stage
846+
if (stage is WhereStage) {
847+
// Check if it's the special 'exists(__name__)' case
848+
val funcExpr = stage.condition as? FunctionExpr
849+
if (funcExpr?.name == "exists" && funcExpr.params.size == 1) {
850+
val fieldExpr = funcExpr.params[0] as? Field
851+
if (fieldExpr?.fieldPath?.isKeyField == true) {
852+
continue // This specific 'exists(__name__)' filter doesn't count
853+
}
854+
}
855+
return false
856+
}
857+
// TODO(pipeline) : Add checks for other filtering stages like Aggregate,
858+
// Distinct, FindNearest once they are implemented.
859+
}
860+
return true
861+
}
862+
863+
internal fun hasLimit(): Boolean {
864+
for (stage in rewrittenStages) {
865+
if (stage.name == "limit") {
866+
return true
867+
}
868+
// TODO(pipeline): need to check for other stages that could have a limit,
869+
// like findNearest
870+
}
871+
return false
872+
}
873+
874+
internal fun matches(doc: Document): Boolean {
875+
val result = evaluate(listOf(doc as MutableDocument))
876+
return result.isNotEmpty()
877+
}
878+
879+
private fun evaluateContext(): EvaluationContext {
880+
return EvaluationContext(this)
881+
}
882+
883+
internal fun comparator(): Comparator<Document> =
884+
getLastEffectiveSortStage().comparator(evaluateContext())
885+
886+
private fun getLastEffectiveSortStage(): SortStage {
887+
for (stage in rewrittenStages.asReversed()) {
888+
if (stage is SortStage) {
889+
return stage
890+
}
891+
// TODO(pipeline): Consider stages that might invalidate ordering later,
892+
// like fineNearest
893+
}
894+
throw fail("RealtimePipeline must contain at least one Sort stage (ensured by RewriteStages).")
815895
}
816896
}
817897

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.firebase.firestore.core
18+
19+
/** An internal interface for classes that can be canonicalized to a string representation. */
20+
internal interface Canonicalizable {
21+
fun canonicalId(): String
22+
}

0 commit comments

Comments
 (0)