Skip to content

Commit

Permalink
3789: Update flow.timeout example to re-throw (#3801)
Browse files Browse the repository at this point in the history
Fixes #3789


Co-authored-by: Vsevolod Tolstopyatov <qwwdfsad@gmail.com>
  • Loading branch information
steve-the-edwards and qwwdfsad committed Jul 20, 2023
1 parent 9b06a69 commit c675e3f
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 6 deletions.
18 changes: 14 additions & 4 deletions kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public fun <T> Flow<T>.debounce(timeout: (T) -> Duration): Flow<T> =
timeout(emittedItem).toDelayMillis()
}

private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long) : Flow<T> =
private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long): Flow<T> =
scopedFlow { downstream ->
// Produce the values using the default (rendezvous) channel
val values = produce {
Expand Down Expand Up @@ -306,7 +306,10 @@ public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> {
/*
* TODO this design (and design of the corresponding operator) depends on #540
*/
internal fun CoroutineScope.fixedPeriodTicker(delayMillis: Long, initialDelayMillis: Long = delayMillis): ReceiveChannel<Unit> {
internal fun CoroutineScope.fixedPeriodTicker(
delayMillis: Long,
initialDelayMillis: Long = delayMillis
): ReceiveChannel<Unit> {
require(delayMillis >= 0) { "Expected non-negative delay, but has $delayMillis ms" }
require(initialDelayMillis >= 0) { "Expected non-negative initial delay, but has $initialDelayMillis ms" }
return produce(capacity = 0) {
Expand Down Expand Up @@ -359,8 +362,15 @@ public fun <T> Flow<T>.sample(period: Duration): Flow<T> = sample(period.toDelay
* emit(3)
* delay(1000)
* emit(4)
* }.timeout(100.milliseconds).catch {
* emit(-1) // Item to emit on timeout
* }.timeout(100.milliseconds).catch { exception ->
* if (exception is TimeoutCancellationException) {
* // Catch the TimeoutCancellationException emitted above.
* // Emit desired item on timeout.
* emit(-1)
* } else {
* // Throw other exceptions.
* throw exception
* }
* }.onEach {
* delay(300) // This will not cause a timeout
* }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,15 @@ flow {
emit(3)
delay(1000)
emit(4)
}.timeout(100.milliseconds).catch {
emit(-1) // Item to emit on timeout
}.timeout(100.milliseconds).catch { exception ->
if (exception is TimeoutCancellationException) {
// Catch the TimeoutCancellationException emitted above.
// Emit desired item on timeout.
emit(-1)
} else {
// Throw other exceptions.
throw exception
}
}.onEach {
delay(300) // This will not cause a timeout
}
Expand Down

0 comments on commit c675e3f

Please sign in to comment.