Skip to content

Commit

Permalink
Specify exception types, add tests on it
Browse files Browse the repository at this point in the history
  • Loading branch information
qwwdfsad committed Apr 16, 2019
1 parent 64be795 commit b550afa
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 4 deletions.
1 change: 0 additions & 1 deletion kotlinx-coroutines-core/common/src/flow/Builders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ public fun <T> flowViaChannel(
bufferSize: Int = 16,
@BuilderInference block: suspend (SendChannel<T>) -> Unit
): Flow<T> {
require(bufferSize >= 0) { "Buffer size should be positive, but was $bufferSize" }
return flow {
coroutineScope {
val channel = Channel<T>(bufferSize)
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/src/flow/operators/Context.kt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import kotlinx.coroutines.flow.unsafeFlow as flow
* This operator uses a channel of the specific [bufferSize] in order to switch between contexts,
* but it is not guaranteed that the channel will be created, implementation is free to optimize it away in case of fusing.
*
* @throws [IllegalStateException] if provided context contains [Job] instance.
* @throws [IllegalArgumentException] if provided context contains [Job] instance.
*/
@FlowPreview
public fun <T> Flow<T>.flowOn(flowContext: CoroutineContext, bufferSize: Int = 16): Flow<T> {
Expand Down Expand Up @@ -88,7 +88,7 @@ public fun <T> Flow<T>.flowOn(flowContext: CoroutineContext, bufferSize: Int = 1
* This operator uses channel of the specific [bufferSize] in order to switch between contexts,
* but it is not guaranteed that channel will be created, implementation is free to optimize it away in case of fusing.
*
* @throws [IllegalStateException] if provided context contains [Job] instance.
* @throws [IllegalArgumentException] if provided context contains [Job] instance.
*/
@FlowPreview
public fun <T, R> Flow<T>.flowWith(
Expand Down
5 changes: 4 additions & 1 deletion kotlinx-coroutines-core/common/src/flow/operators/Limit.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import kotlin.jvm.*

/**
* Returns a flow that ignores first [count] elements.
* Throws [IllegalArgumentException] if [count] is negative.
*/
@FlowPreview
public fun <T> Flow<T>.drop(count: Int): Flow<T> {
Expand Down Expand Up @@ -45,10 +46,12 @@ public fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T> = f
/**
* Returns a flow that contains first [count] elements.
* When [count] elements are consumed, the original flow is cancelled.
* Throws [IllegalArgumentException] if [count] is negative.
*/
@FlowPreview
public fun <T> Flow<T>.take(count: Int): Flow<T> {
require(count > 0) { "Take count should be positive, but had $count" }
require(count >= 0) { "Requested element count $count is less than zero." }
if (count == 0) return emptyFlow()

This comment was marked as resolved.

Copy link
@JakeWharton

JakeWharton Apr 19, 2019

Contributor

I don't think this is safe. It means any side-effects of the upstream are ignored. You still need to trigger the upstream.

This comment was marked as resolved.

Copy link
@elizarov

elizarov Apr 20, 2019

Contributor

Trigger the effects but cancel it when? Wait until it emits first element? That is not consistent with take(1) that cancels right after first emission. I'd rather make take(0) illegal. Frankly, I don't really see a use-case.

This comment was marked as resolved.

Copy link
@JakeWharton

JakeWharton Apr 20, 2019

Contributor

I agree.

return flow {
var consumed = 0
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.test.*

class FlowViaChannelTest : TestBase() {
@Test
fun testRegular() = runTest {
val flow = flowViaChannel<Int> {
it.send(1)
it.send(2)
it.close()
}
assertEquals(listOf(1, 2), flow.toList())
}

@Test
fun testConflated() = runTest {
val flow = flowViaChannel<Int>(bufferSize = Channel.CONFLATED) {
it.send(1)
it.send(2)
it.close()
}
assertEquals(listOf(1), flow.toList())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ class DropTest : TestBase() {
assertEquals(0, flowOf<Int>().drop(1).sum())
}

@Test
fun testNegativeCount() {
assertFailsWith<IllegalArgumentException> {
emptyFlow<Int>().drop(-1)
}
}

@Test
fun testErrorCancelsUpstream() = runTest {
val flow = flow {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.coroutines.*
import kotlin.test.*

class FlowContextTest : TestBase() {
Expand Down Expand Up @@ -142,4 +143,14 @@ class FlowContextTest : TestBase() {
finish(7)
ensureActive()
}

@Test
fun testIllegalArgumentException() {
val flow = emptyFlow<Int>()
assertFailsWith<IllegalArgumentException> { flow.flowOn(Job()) }
assertFailsWith<IllegalArgumentException> { flow.flowWith(Job()) { this } }
assertFailsWith<IllegalArgumentException> { flow.flowOn(EmptyCoroutineContext, bufferSize = -1) }
assertFailsWith<IllegalArgumentException> { flow.flowWith(EmptyCoroutineContext, bufferSize = -1) { this } }

}
}
10 changes: 10 additions & 0 deletions kotlinx-coroutines-core/common/test/flow/operators/TakeTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ class TakeTest : TestBase() {
assertEquals(0, sum)
}

@Test
fun testNonPositiveValues() = runTest {
val flow = flowOf(1)
assertFailsWith<IllegalArgumentException> {
flow.take(-1)
}

assertNull(flow.take(0).singleOrNull())
}

@Test
fun testCancelUpstream() = runTest {
var cancelled = false
Expand Down

0 comments on commit b550afa

Please sign in to comment.