This repository has been archived by the owner on Feb 9, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 24
/
CoroutineWorker.kt
180 lines (156 loc) · 5.57 KB
/
CoroutineWorker.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package com.autodesk.coroutineworker
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlin.coroutines.CoroutineContext
import kotlin.native.concurrent.AtomicInt
import kotlin.native.concurrent.freeze
public actual class CoroutineWorker internal actual constructor() {
/**
* True, if the job was cancelled; false, otherwise.
*/
private val state = CoroutineWorkerState()
public actual fun cancel() {
cancelIfRunning()
}
private fun cancelIfRunning(): Boolean {
if (state.completed) {
return false
}
// signal that this job should cancel
state.cancelled = true
return true
}
public actual suspend fun cancelAndJoin() {
if (!cancelIfRunning()) {
return
}
// repeated check and wait for the job to complete
waitAndDelayForCondition { state.completed }
}
public actual companion object {
/**
* Gets the number of active workers running in the underlying WorkerPool.
* This is useful when testing, to ensure you don't leave workers running
* across tests.
*/
public val numActiveWorkers: Int
get() = executor.numActiveWorkers
/** The executor used for all BackgroundJobs */
private val executor = BackgroundCoroutineWorkQueueExecutor<WorkItem>(4)
public actual fun execute(block: suspend CoroutineScope.() -> Unit): CoroutineWorker {
return executeInternal(false, block)
}
public actual suspend fun <T> withContext(jvmContext: CoroutineContext, block: suspend CoroutineScope.() -> T): T {
val isIoWork = jvmContext == IODispatcher
if (isIoWork && BackgroundCoroutineWorkQueueExecutor.shouldPerformIoWorkInline()) {
return coroutineScope(block)
}
return threadSafeSuspendCallback<T> { completion ->
val job = executeInternal(isIoWork) {
val result = runCatching {
block()
}
completion(result)
}
return@threadSafeSuspendCallback { job.cancel() }
}
}
private fun executeInternal(isIoWork: Boolean, block: suspend CoroutineScope.() -> Unit): CoroutineWorker {
return CoroutineWorker().also {
val state = it.state
executor.enqueueWork(
WorkItem(
{ state.cancelled },
{ state.completed = true },
block
),
isIoWork
)
}
}
/** CoroutineWorker's CoroutineWorkItem class that listens for cancellation */
private class WorkItem(
val cancelled: () -> Boolean,
val notifyCompletion: () -> Unit,
val block: suspend CoroutineScope.() -> Unit
) : CoroutineWorkItem {
override val work: suspend CoroutineScope.() -> Unit
init {
work = {
var completed = false
try {
repeatedlyCheckForCancellation(this.coroutineContext, cancelled) { completed }
// inside a new CoroutineScope, so that child jobs are cancelled
coroutineScope {
block()
}
} finally {
completed = true
notifyCompletion()
}
}
}
// repeatedly checks if the scope has been cancelled and cancels the scope if needed; bails out, when the job completes
private fun CoroutineScope.repeatedlyCheckForCancellation(context: CoroutineContext, cancelled: () -> Boolean, completedGetter: () -> Boolean) {
launch {
waitAndDelayForCondition {
val cancelledValue = cancelled()
if (cancelledValue) {
context.cancel()
}
completedGetter() || cancelledValue
}
}
}
}
}
init { freeze() }
}
private class CoroutineWorkerState {
/**
* The backing store for the state
*/
private val value = AtomicInt(0)
/**
* True, if the job was cancelled; false, otherwise.
*/
var cancelled: Boolean
get() = isSet(cancelledBit)
set(value) = updateValue(cancelledBit, value)
/**
* True, if the job finished; false, otherwise.
*/
var completed: Boolean
get() = isSet(completedBit)
set(value) = updateValue(completedBit, value)
/**
* Updates the value with the bit, setting or un-setting it
*/
private fun updateValue(bit: Int, set: Boolean) {
do {
val old = value.value
val new = if (set) {
old or bit
} else {
old and bit.inv()
}
} while (!value.compareAndSet(old, new))
}
/**
* Returns whether or not the bit is set
*/
private fun isSet(bit: Int) = (value.value and bit) == bit
companion object {
/**
* Cancelled bit
*/
private const val cancelledBit = 1
/**
* Completed bit
*/
private const val completedBit = 2
}
init { freeze() }
}