-
-
Notifications
You must be signed in to change notification settings - Fork 158
/
Racing.kt
133 lines (122 loc) · 4.94 KB
/
Racing.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/*
* Copyright 2019 Louis Cognault Ayeva Derman. Use of this source code is governed by the Apache 2.0 license.
*/
@file:Suppress("NOTHING_TO_INLINE")
package splitties.coroutines
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.select
import splitties.collections.forEachByIndex
import splitties.experimental.ExperimentalSplittiesApi
import kotlin.experimental.ExperimentalTypeInference
import kotlinx.coroutines.CoroutineStart.UNDISPATCHED as Undispatched
/**
* Pass at least one racer to use [raceOf], or use [race] if the racers need to be launched
* dynamically.
*/
@Suppress("DeprecatedCallableAddReplaceWith", "RedundantSuspendModifier")
@Deprecated("A race needs racers.", level = DeprecationLevel.ERROR) // FOOL GUARD, DO NOT REMOVE
suspend fun <T> raceOf(): T = throw UnsupportedOperationException("A race needs racers.")
/**
* Races all the [racers] concurrently. Once the winner completes, all other racers are cancelled,
* then the value of the winner is returned.
*
* Use [race] if the racers need to be launched dynamically.
*/
@ExperimentalSplittiesApi
suspend fun <T> raceOf(vararg racers: suspend CoroutineScope.() -> T): T {
require(racers.isNotEmpty()) { "A race needs racers." }
return coroutineScope {
@Suppress("RemoveExplicitTypeArguments")
select<T> {
@OptIn(ExperimentalCoroutinesApi::class)
val racersAsyncList = racers.map {
async(start = Undispatched, block = it)
}
racersAsyncList.forEachByIndex { racer: Deferred<T> ->
racer.onAwait { resultOfWinner: T ->
racersAsyncList.forEachByIndex { deferred: Deferred<T> -> deferred.cancel() }
return@onAwait resultOfWinner
}
}
}
}
}
/**
* A scope meant to be used in [race] lambda receiver.
*
* You should not implement this interface yourself.
*/
@ExperimentalSplittiesApi
interface RacingScope<in T> : CoroutineScope {
@Deprecated(
message = "Internal API",
replaceWith = ReplaceWith("launchRacer(block)", "splitties.coroutines.launchRacer")
)
fun launchRacerInternal(block: suspend CoroutineScope.() -> T)
}
/**
* Launches a racer in this scope.
* **Must be cancellable**, it will suspend [race] completion otherwise.
*
* Use it inside the lambda passed to the [race] function.
*/
@ExperimentalSplittiesApi
inline fun <T> RacingScope<T>.launchRacer(noinline block: suspend CoroutineScope.() -> T) {
@Suppress("DEPRECATION")
launchRacerInternal(block)
}
/**
* Starts a [RacingScope] with the suspending [builder] lambda in which you can call [launchRacer]
* each time you want to launch a racer coroutine. Once a racer completes, the [builder] and all
* racers are cancelled, then the value of the winning racer is returned.
*
* For races where the number of racers is static, you can use the slightly more efficient [raceOf]
* function and directly pass the cancellable lambdas you want to race concurrently.
*/
@ExperimentalSplittiesApi
@OptIn(ExperimentalTypeInference::class)
suspend fun <T> race(
@BuilderInference
builder: suspend RacingScope<T>.() -> Unit
): T = coroutineScope {
val racersAsyncList = mutableListOf<Deferred<T>>()
@Suppress("RemoveExplicitTypeArguments")
select<T> {
val builderJob = Job(parent = coroutineContext[Job])
val racingScope = object : RacingScope<T>, CoroutineScope by this@coroutineScope {
var raceWon = false
@Suppress("OverridingDeprecatedMember")
override fun launchRacerInternal(block: suspend CoroutineScope.() -> T) {
if (raceWon) return // A racer already completed.
async(block = block).also { racerAsync ->
racersAsyncList += racerAsync
if (raceWon) { // A racer just completed on another thread, cancel.
racerAsync.cancel()
}
}.onAwait { resultOfWinner: T ->
raceWon = true
builderJob.cancel()
var i = 0
// Since launchRacerInternal might be called on multiple threads concurrently,
// we don't use a forEach loop, but a while loop that is additions tolerant.
while (i <= racersAsyncList.lastIndex) {
val deferred: Deferred<T> = racersAsyncList[i]
deferred.cancel()
i++
}
return@onAwait resultOfWinner
}
}
}
@OptIn(ExperimentalCoroutinesApi::class)
launch(builderJob, start = Undispatched) {
racingScope.builder()
}
}
}