11class FlowTestObserver <T >(
2- coroutineScope : CoroutineScope ,
3- private val flow : Flow <T >
2+ private val coroutineScope : CoroutineScope ,
3+ private val flow : Flow <T >,
4+ private val waitForDelay : Boolean = false
45) {
56 private val testValues = mutableListOf<T >()
67 private var error: Throwable ? = null
78
8- // private val job: Job = coroutineScope.launch {
9- // flow
10- // .catch { throwable ->
11- // error = throwable
12- // }
13- // .collect {
14- // testValues.add(it)
15- // }
16- // }
17-
18- private val job: Job = flow
19- .catch { throwable ->
20- error = throwable
9+ private var isInitialized = false
10+
11+ private var isCompleted = false
12+
13+ private lateinit var job: Job
14+
15+ private suspend fun init () {
16+ job = createJob(coroutineScope)
17+
18+ // Wait this job after end of possible delays
19+ // job.join()
20+ }
21+
22+ private suspend fun initialize () {
23+
24+ if (! isInitialized) {
25+
26+ if (waitForDelay) {
27+ try {
28+ withTimeout(Long .MAX_VALUE ) {
29+ job = createJob(this )
30+ }
31+ } catch (e: Exception ) {
32+ isCompleted = false
33+ }
34+ } else {
35+ job = createJob(coroutineScope)
36+ }
2137 }
22- .onEach { testValues.add(it) }
23- .launchIn(scope = coroutineScope)
38+ }
2439
40+ private fun createJob (scope : CoroutineScope ): Job {
41+
42+ val job = flow
43+ .onStart { isInitialized = true }
44+ .onCompletion { cause ->
45+ isCompleted = (cause == null )
46+ }
47+ .catch { throwable ->
48+ error = throwable
49+ }
50+ .onEach { testValues.add(it) }
51+ .launchIn(scope)
52+ return job
53+ }
54+
55+ suspend fun assertNoValue (): FlowTestObserver <T > {
56+
57+ initialize()
2558
26- fun assertNoValues (): FlowTestObserver <T > {
2759 if (testValues.isNotEmpty()) throw AssertionError (
2860 " Assertion error! Actual size ${testValues.size} "
2961 )
3062 return this
3163 }
3264
33- fun assertValueCount (count : Int ): FlowTestObserver <T > {
65+ suspend fun assertValueCount (count : Int ): FlowTestObserver <T > {
66+
67+ initialize()
68+
3469 if (count < 0 ) throw AssertionError (
3570 " Assertion error! Value count cannot be smaller than zero"
3671 )
@@ -40,84 +75,195 @@ class FlowTestObserver<T>(
4075 return this
4176 }
4277
43- fun assertValues (vararg values : T ): FlowTestObserver <T > {
78+ suspend fun assertValues (vararg values : T ): FlowTestObserver <T > {
79+
80+ initialize()
81+
4482 if (! testValues.containsAll(values.asList()))
45- throw AssertionError (" Assertion error! At least one value does not match" )
83+ throw AssertionError (" Assertion error! At least one value does not match" )
4684 return this
4785 }
4886
49- // fun assertValues(predicate: List<T>.() -> Boolean): FlowTestObserver<T> {
50- // if (!testValues.predicate())
51- // throw AssertionError("Assertion error! At least one value does not match")
52- // return this
53- // }
54- //
55- // fun values(predicate: List<T>.() -> Unit): FlowTestObserver<T> {
56- // testValues.predicate()
57- // return this
58- // }
87+ suspend fun assertValues (predicate : (List <T >) -> Boolean ): FlowTestObserver <T > {
88+
89+ initialize()
5990
60- fun assertValues (predicate : (List <T >) -> Boolean ): FlowTestObserver <T > {
6191 if (! predicate(testValues))
62- throw AssertionError (" Assertion error! At least one value does not match" )
92+ throw AssertionError (" Assertion error! At least one value does not match" )
6393 return this
6494 }
6595
66- fun assertError (throwable : Throwable ): FlowTestObserver <T > {
96+ /* *
97+ * Asserts that this [FlowTestObserver] received exactly one [Flow.onEach] or [Flow.collect]
98+ * value for which the provided predicate returns `true`.
99+ */
100+ suspend fun assertValue (predicate : (T ) -> Boolean ): FlowTestObserver <T > {
101+ return assertValueAt(0 , predicate)
102+ }
67103
68- val errorNotNull = exceptionNotNull()
104+ suspend fun assertValueAt (index : Int , predicate : (T ) -> Boolean ): FlowTestObserver <T > {
105+
106+ initialize()
107+
108+ if (testValues.size == 0 ) throw AssertionError (" Assertion error! No values" )
109+
110+ if (index < 0 ) throw AssertionError (
111+ " Assertion error! Index cannot be smaller than zero"
112+ )
113+
114+ if (index > testValues.size) throw AssertionError (
115+ " Assertion error! Invalid index: $index "
116+ )
117+
118+ if (! predicate(testValues[index]))
119+ throw AssertionError (" Assertion error! At least one value does not match" )
69120
70- if (! (errorNotNull::class .java == throwable::class .java &&
71- errorNotNull.message == throwable.message)
121+ return this
122+ }
123+
124+ suspend fun assertValueAt (index : Int , value : T ): FlowTestObserver <T > {
125+
126+ initialize()
127+
128+ if (testValues.size == 0 ) throw AssertionError (" Assertion error! No values" )
129+
130+ if (index < 0 ) throw AssertionError (
131+ " Assertion error! Index cannot be smaller than zero"
132+ )
133+
134+ if (index > testValues.size) throw AssertionError (
135+ " Assertion error! Invalid index: $index "
72136 )
73- throw AssertionError (" Assertion Error! throwable: $throwable does not match $errorNotNull " )
137+
138+ if (testValues[index] != value)
139+ throw AssertionError (" Assertion Error Objects don't match" )
74140
75141 return this
76142 }
77143
78- fun assertError (errorClass : Class <Throwable >): FlowTestObserver <T > {
144+ /* *
145+ * Asserts that this [FlowTestObserver] received
146+ * [Flow.catch] the exact same throwable. Since most exceptions don't implement `equals`
147+ * it would be better to call overload to test against the class of
148+ * an error instead of an instance of an error
149+ */
150+ suspend fun assertError (throwable : Throwable ): FlowTestObserver <T > {
151+
152+ initialize()
79153
80154 val errorNotNull = exceptionNotNull()
81155
82- if (errorNotNull::class .java != errorClass)
83- throw AssertionError (" Assertion Error! errorClass $errorClass does not match ${errorNotNull::class .java} " )
156+ if (! (
157+ errorNotNull::class .java == throwable::class .java &&
158+ errorNotNull.message == throwable.message
159+ )
160+ )
161+ throw AssertionError (
162+ " Assertion Error! " +
163+ " throwable: $throwable does not match $errorNotNull "
164+ )
165+ return this
166+ }
167+
168+ /* *
169+ * Asserts that this [FlowTestObserver] received
170+ * [Flow.catch] which is an instance of the specified errorClass Class.
171+ */
172+ suspend fun assertError (errorClass : Class <out Throwable >): FlowTestObserver <T > {
173+
174+ initialize()
175+
176+ val errorNotNull = exceptionNotNull()
84177
178+ if (errorNotNull::class .java != errorClass)
179+ throw AssertionError (
180+ " Assertion Error! errorClass $errorClass " +
181+ " does not match ${errorNotNull::class .java} "
182+ )
85183 return this
86184 }
87185
88- fun assertError (predicate : (Throwable ) -> Boolean ): FlowTestObserver <T > {
186+ /* *
187+ * Asserts that this [FlowTestObserver] received exactly [Flow.catch] event for which
188+ * the provided predicate returns `true`.
189+ */
190+ suspend fun assertError (predicate : (Throwable ) -> Boolean ): FlowTestObserver <T > {
191+
192+ initialize()
89193
90194 val errorNotNull = exceptionNotNull()
91195
92196 if (! predicate(errorNotNull))
93197 throw AssertionError (" Assertion Error! Exception for $errorNotNull " )
198+ return this
199+ }
200+
201+ suspend fun assertNoErrors (): FlowTestObserver <T > {
202+
203+ initialize()
204+
205+ if (error != null )
206+ throw AssertionError (" Assertion Error! Exception occurred $error " )
94207
95208 return this
96209 }
97210
98- fun assertNull (): FlowTestObserver <T > {
211+ suspend fun assertNull (): FlowTestObserver <T > {
99212
100- testValues.forEach {
101- if (it != null ) throw AssertionError (" Assertion Error! There are more than one item that is not nuşş" )
213+ initialize()
102214
215+ testValues.forEach {
216+ if (it != null ) throw AssertionError (
217+ " Assertion Error! " +
218+ " There are more than one item that is not null"
219+ )
103220 }
104221
105222 return this
106223 }
107224
108- fun values (predicate : (List <T >) -> Unit ): FlowTestObserver <T > {
225+ /* *
226+ * Assert that this [FlowTestObserver] received [Flow.onCompletion] event without a [Throwable]
227+ */
228+ suspend fun assertComplete (): FlowTestObserver <T > {
229+
230+ initialize()
231+
232+ if (! isCompleted) throw AssertionError (
233+ " Assertion Error!" +
234+ " Job is not completed or onCompletion called with a error!"
235+ )
236+ return this
237+ }
238+
239+ /* *
240+ * Assert that this [FlowTestObserver] either not received [Flow.onCompletion] event or
241+ * received event with
242+ */
243+ suspend fun assertNotComplete (): FlowTestObserver <T > {
244+
245+ initialize()
246+
247+ if (isCompleted) throw AssertionError (" Assertion Error! Job is completed!" )
248+ return this
249+ }
250+
251+ suspend fun values (predicate : (List <T >) -> Unit ): FlowTestObserver <T > {
109252 predicate(testValues)
110253 return this
111254 }
112255
113- fun values (): List <T > {
256+ suspend fun values (): List <T > {
257+
258+ initialize()
259+
114260 return testValues
115261 }
116262
117263 private fun exceptionNotNull (): Throwable {
118264
119265 if (error == null )
120- throw AssertionError (" There is no exception" )
266+ throw AssertionError (" There is no exception" )
121267
122268 return error!!
123269 }
@@ -127,6 +273,33 @@ class FlowTestObserver<T>(
127273 }
128274}
129275
130- fun <T > Flow<T>.test (scope : CoroutineScope ): FlowTestObserver <T > {
131- return FlowTestObserver (scope, this )
276+ /* *
277+ * Creates a RxJava2 style test observer that uses `onStart`, `onEach`, `onCompletion`
278+ *
279+ * * Set waitForDelay true for testing delay.
280+ *
281+ * ### Note: waiting for delay with a channel that sends values throw TimeoutCancellationException,
282+ * don't use timeout with channel
283+ * TODO Fix channel issue
284+ */
285+ suspend fun <T > Flow<T>.test (
286+ scope : CoroutineScope ,
287+ waitForDelay : Boolean = true
288+ ): FlowTestObserver <T > {
289+
290+ return FlowTestObserver (scope, this @test, waitForDelay)
291+ }
292+
293+ /* *
294+ * Test function that awaits with time out until each delay method is run and then since
295+ * it takes a predicate that runs after a timeout.
296+ */
297+ suspend fun <T > Flow<T>.testAfterDelay (
298+ scope : CoroutineScope ,
299+ predicate : suspend FlowTestObserver <T >.() -> Unit
300+
301+ ): Job {
302+ return scope.launch(coroutineContext) {
303+ FlowTestObserver (this , this @testAfterDelay, true ).predicate()
304+ }
132305}
0 commit comments