@@ -21,6 +21,7 @@ import com.google.firebase.firestore.core.Canonicalizable
21
21
import com.google.firebase.firestore.model.Document
22
22
import com.google.firebase.firestore.model.DocumentKey
23
23
import com.google.firebase.firestore.model.MutableDocument
24
+ import com.google.firebase.firestore.model.ResourcePath
24
25
import com.google.firebase.firestore.model.Values
25
26
import com.google.firebase.firestore.pipeline.AddFieldsStage
26
27
import com.google.firebase.firestore.pipeline.AggregateFunction
@@ -43,7 +44,6 @@ import com.google.firebase.firestore.pipeline.InternalOptions
43
44
import com.google.firebase.firestore.pipeline.LimitStage
44
45
import com.google.firebase.firestore.pipeline.OffsetStage
45
46
import com.google.firebase.firestore.pipeline.Ordering
46
- import com.google.firebase.firestore.pipeline.PipelineOptions
47
47
import com.google.firebase.firestore.pipeline.RawStage
48
48
import com.google.firebase.firestore.pipeline.RemoveFieldsStage
49
49
import com.google.firebase.firestore.pipeline.ReplaceStage
@@ -55,17 +55,28 @@ import com.google.firebase.firestore.pipeline.Stage
55
55
import com.google.firebase.firestore.pipeline.UnionStage
56
56
import com.google.firebase.firestore.pipeline.UnnestStage
57
57
import com.google.firebase.firestore.pipeline.WhereStage
58
+ import com.google.firebase.firestore.remote.RemoteSerializer
58
59
import com.google.firebase.firestore.util.Assert.fail
59
60
import com.google.firestore.v1.ExecutePipelineRequest
60
61
import com.google.firestore.v1.StructuredPipeline
61
62
import com.google.firestore.v1.Value
62
63
63
- open class AbstractPipeline
64
+ class Pipeline
64
65
internal constructor (
65
- internal val firestore: FirebaseFirestore ,
66
- internal val userDataReader: UserDataReader ,
67
- internal val stages: List <Stage <* >>
66
+ private val firestore: FirebaseFirestore ,
67
+ private val userDataReader: UserDataReader ,
68
+ private val stages: List <Stage <* >>
68
69
) {
70
+ internal constructor (
71
+ firestore: FirebaseFirestore ,
72
+ userDataReader: UserDataReader ,
73
+ stage: Stage <* >
74
+ ) : this (firestore, userDataReader, listOf (stage))
75
+
76
+ private fun append (stage : Stage <* >): Pipeline {
77
+ return Pipeline (firestore, userDataReader, stages.plus(stage))
78
+ }
79
+
69
80
private fun toStructuredPipelineProto (options : InternalOptions ? ): StructuredPipeline {
70
81
val builder = StructuredPipeline .newBuilder()
71
82
builder.pipeline = toPipelineProto()
@@ -79,17 +90,17 @@ internal constructor(
79
90
.build()
80
91
81
92
private fun toExecutePipelineRequest (options : InternalOptions ? ): ExecutePipelineRequest {
82
- val database = firestore.databaseId
93
+ val database = firestore!! .databaseId
83
94
val builder = ExecutePipelineRequest .newBuilder()
84
95
builder.database = " projects/${database.projectId} /databases/${database.databaseId} "
85
96
builder.structuredPipeline = toStructuredPipelineProto(options)
86
97
return builder.build()
87
98
}
88
99
89
- protected fun execute (options : InternalOptions ? ): Task <PipelineSnapshot > {
100
+ fun execute (options : InternalOptions ? ): Task <PipelineSnapshot > {
90
101
val request = toExecutePipelineRequest(options)
91
102
val observerTask = ObserverSnapshotTask ()
92
- firestore.callClient { call -> call!! .executePipeline(request, observerTask) }
103
+ firestore? .callClient { call -> call!! .executePipeline(request, observerTask) }
93
104
return observerTask.task
94
105
}
95
106
@@ -106,7 +117,7 @@ internal constructor(
106
117
) {
107
118
results.add(
108
119
PipelineResult (
109
- firestore,
120
+ firestore!! ,
110
121
userDataWriter,
111
122
if (key == null ) null else DocumentReference (key, firestore),
112
123
data,
@@ -127,28 +138,9 @@ internal constructor(
127
138
val task: Task <PipelineSnapshot >
128
139
get() = taskCompletionSource.task
129
140
}
130
- }
131
-
132
- class Pipeline
133
- private constructor (
134
- firestore: FirebaseFirestore ,
135
- userDataReader: UserDataReader ,
136
- stages: List <Stage <* >>
137
- ) : AbstractPipeline (firestore, userDataReader, stages) {
138
- internal constructor (
139
- firestore: FirebaseFirestore ,
140
- userDataReader: UserDataReader ,
141
- stage: Stage <* >
142
- ) : this (firestore, userDataReader, listOf (stage))
143
-
144
- private fun append (stage : Stage <* >): Pipeline {
145
- return Pipeline (firestore, userDataReader, stages.plus(stage))
146
- }
147
141
148
142
fun execute (): Task <PipelineSnapshot > = execute(null )
149
143
150
- fun execute (options : PipelineOptions ): Task <PipelineSnapshot > = execute(options.options)
151
-
152
144
internal fun documentReference (key : DocumentKey ): DocumentReference {
153
145
return DocumentReference (key, firestore)
154
146
}
@@ -627,7 +619,7 @@ class PipelineSource internal constructor(private val firestore: FirebaseFiresto
627
619
* @param path A path to a collection that will be the source of this pipeline.
628
620
* @return A new [Pipeline] object with documents from target collection.
629
621
*/
630
- fun collection (path : String ): Pipeline = collection(CollectionSource .of (path))
622
+ fun collection (path : String ): Pipeline = collection(firestore.collection (path))
631
623
632
624
/* *
633
625
* Set the pipeline's source to the collection specified by the given [CollectionReference].
@@ -637,7 +629,8 @@ class PipelineSource internal constructor(private val firestore: FirebaseFiresto
637
629
* @throws [IllegalArgumentException] Thrown if the [ref] provided targets a different project or
638
630
* database than the pipeline.
639
631
*/
640
- fun collection (ref : CollectionReference ): Pipeline = collection(CollectionSource .of(ref))
632
+ fun collection (ref : CollectionReference ): Pipeline =
633
+ collection(CollectionSource .of(ref, firestore.databaseId))
641
634
642
635
/* *
643
636
* Set the pipeline's source to the collection specified by CollectionSource.
@@ -648,7 +641,7 @@ class PipelineSource internal constructor(private val firestore: FirebaseFiresto
648
641
* or database than the pipeline.
649
642
*/
650
643
fun collection (stage : CollectionSource ): Pipeline {
651
- if (stage.firestore != null && stage.firestore. databaseId != firestore.databaseId) {
644
+ if (stage.serializer. databaseId() != firestore.databaseId) {
652
645
throw IllegalArgumentException (" Provided collection is from a different Firestore instance." )
653
646
}
654
647
return Pipeline (firestore, firestore.userDataReader, stage)
@@ -661,9 +654,9 @@ class PipelineSource internal constructor(private val firestore: FirebaseFiresto
661
654
* @return A new [Pipeline] object with documents from target collection group.
662
655
*/
663
656
fun collectionGroup (collectionId : String ): Pipeline =
664
- pipeline (CollectionGroupSource .of((collectionId)))
657
+ collectionGroup (CollectionGroupSource .of((collectionId)))
665
658
666
- fun pipeline (stage : CollectionGroupSource ): Pipeline =
659
+ internal fun collectionGroup (stage : CollectionGroupSource ): Pipeline =
667
660
Pipeline (firestore, firestore.userDataReader, stage)
668
661
669
662
/* *
@@ -706,20 +699,34 @@ class PipelineSource internal constructor(private val firestore: FirebaseFiresto
706
699
return Pipeline (
707
700
firestore,
708
701
firestore.userDataReader,
709
- DocumentsSource (documents.map { docRef -> " / " + docRef. path }.toTypedArray())
702
+ DocumentsSource (documents.map { ResourcePath .fromString(it. path) }.toTypedArray())
710
703
)
711
704
}
712
705
}
713
706
714
707
class RealtimePipelineSource internal constructor(private val firestore : FirebaseFirestore ) {
708
+ /* *
709
+ * Convert the given Query into an equivalent Pipeline.
710
+ *
711
+ * @param query A Query to be converted into a Pipeline.
712
+ * @return A new [Pipeline] object that is equivalent to [query]
713
+ * @throws [IllegalArgumentException] Thrown if the [query] provided targets a different project
714
+ * or database than the pipeline.
715
+ */
716
+ fun convertFrom (query : Query ): RealtimePipeline {
717
+ if (query.firestore.databaseId != firestore.databaseId) {
718
+ throw IllegalArgumentException (" Provided query is from a different Firestore instance." )
719
+ }
720
+ return query.query.toRealtimePipeline(firestore, firestore.userDataReader)
721
+ }
715
722
716
723
/* *
717
724
* Set the pipeline's source to the collection specified by the given path.
718
725
*
719
726
* @param path A path to a collection that will be the source of this pipeline.
720
727
* @return A new [RealtimePipeline] object with documents from target collection.
721
728
*/
722
- fun collection (path : String ): RealtimePipeline = collection(CollectionSource .of (path))
729
+ fun collection (path : String ): RealtimePipeline = collection(firestore.collection (path))
723
730
724
731
/* *
725
732
* Set the pipeline's source to the collection specified by the given [CollectionReference].
@@ -729,7 +736,8 @@ class RealtimePipelineSource internal constructor(private val firestore: Firebas
729
736
* @throws [IllegalArgumentException] Thrown if the [ref] provided targets a different project or
730
737
* database than the pipeline.
731
738
*/
732
- fun collection (ref : CollectionReference ): RealtimePipeline = collection(CollectionSource .of(ref))
739
+ fun collection (ref : CollectionReference ): RealtimePipeline =
740
+ collection(CollectionSource .of(ref, firestore.databaseId))
733
741
734
742
/* *
735
743
* Set the pipeline's source to the collection specified by CollectionSource.
@@ -740,10 +748,10 @@ class RealtimePipelineSource internal constructor(private val firestore: Firebas
740
748
* or database than the pipeline.
741
749
*/
742
750
fun collection (stage : CollectionSource ): RealtimePipeline {
743
- if (stage.firestore != null && stage.firestore. databaseId != firestore.databaseId) {
751
+ if (stage.serializer. databaseId() != firestore.databaseId) {
744
752
throw IllegalArgumentException (" Provided collection is from a different Firestore instance." )
745
753
}
746
- return RealtimePipeline (firestore, firestore.userDataReader, stage)
754
+ return RealtimePipeline (RemoteSerializer ( firestore.databaseId) , firestore.userDataReader, stage)
747
755
}
748
756
749
757
/* *
@@ -753,26 +761,26 @@ class RealtimePipelineSource internal constructor(private val firestore: Firebas
753
761
* @return A new [RealtimePipeline] object with documents from target collection group.
754
762
*/
755
763
fun collectionGroup (collectionId : String ): RealtimePipeline =
756
- pipeline (CollectionGroupSource .of((collectionId)))
764
+ collectionGroup (CollectionGroupSource .of((collectionId)))
757
765
758
- fun pipeline (stage : CollectionGroupSource ): RealtimePipeline =
759
- RealtimePipeline (firestore, firestore.userDataReader, stage)
766
+ fun collectionGroup (stage : CollectionGroupSource ): RealtimePipeline =
767
+ RealtimePipeline (RemoteSerializer ( firestore.databaseId) , firestore.userDataReader, stage)
760
768
}
761
769
762
770
class RealtimePipeline
763
771
internal constructor (
764
- firestore : FirebaseFirestore ,
765
- userDataReader: UserDataReader ,
766
- stages: List <Stage <* >>
767
- ) : AbstractPipeline (firestore, userDataReader, stages), Canonicalizable {
772
+ internal val serializer : RemoteSerializer ,
773
+ internal val userDataReader: UserDataReader ,
774
+ internal val stages: List <Stage <* >>
775
+ ) : Canonicalizable {
768
776
internal constructor (
769
- firestore : FirebaseFirestore ,
777
+ serializer : RemoteSerializer ,
770
778
userDataReader: UserDataReader ,
771
779
stage: Stage <* >
772
- ) : this (firestore , userDataReader, listOf (stage))
780
+ ) : this (serializer , userDataReader, listOf (stage))
773
781
774
782
private fun with (stages : List <Stage <* >>): RealtimePipeline =
775
- RealtimePipeline (firestore , userDataReader, stages)
783
+ RealtimePipeline (serializer , userDataReader, stages)
776
784
777
785
private fun append (stage : Stage <* >): RealtimePipeline = with (stages.plus(stage))
778
786
@@ -820,14 +828,17 @@ internal constructor(
820
828
return rewrittenStages.joinToString(" |" ) { stage -> (stage as Canonicalizable ).canonicalId() }
821
829
}
822
830
831
+ override fun toString (): String = canonicalId()
832
+
823
833
override fun equals (other : Any? ): Boolean {
824
834
if (this == = other) return true
825
835
if (other !is RealtimePipeline ) return false
826
- return stages == other.stages
836
+ if (serializer.databaseId() != other.serializer.databaseId()) return false
837
+ return rewrittenStages == other.rewrittenStages
827
838
}
828
839
829
840
override fun hashCode (): Int {
830
- return stages.hashCode()
841
+ return serializer.databaseId().hashCode() * 31 + stages.hashCode()
831
842
}
832
843
833
844
internal fun evaluate (inputs : List <MutableDocument >): List <MutableDocument > {
@@ -883,6 +894,15 @@ internal constructor(
883
894
internal fun comparator (): Comparator <Document > =
884
895
getLastEffectiveSortStage().comparator(evaluateContext())
885
896
897
+ internal fun toStructurePipelineProto (): StructuredPipeline {
898
+ val builder = StructuredPipeline .newBuilder()
899
+ builder.pipeline =
900
+ com.google.firestore.v1.Pipeline .newBuilder()
901
+ .addAllStages(rewrittenStages.map { it.toProtoStage(userDataReader) })
902
+ .build()
903
+ return builder.build()
904
+ }
905
+
886
906
private fun getLastEffectiveSortStage (): SortStage {
887
907
for (stage in rewrittenStages.asReversed()) {
888
908
if (stage is SortStage ) {
0 commit comments