This repository has been archived by the owner on Aug 10, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 568
/
Worker.kt
113 lines (101 loc) · 5.34 KB
/
Worker.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
/*
* Copyright 2010-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license
* that can be found in the LICENSE file.
*/
package kotlin.native.concurrent
import kotlin.native.internal.ExportForCppRuntime
import kotlin.native.internal.Frozen
import kotlin.native.internal.VolatileLambda
import kotlin.native.internal.IntrinsicType
import kotlin.native.internal.TypedIntrinsic
import kotlinx.cinterop.*
/**
* ## Workers: theory of operations.
*
* [Worker] represents asynchronous and concurrent computation, usually performed by other threads
* in the same process. Object passing between workers is performed using transfer operation, so that
* object graph belongs to one worker at the time, but can be disconnected and reconnected as needed.
* See 'Object Transfer Basics' and [TransferMode] for more details on how objects shall be transferred.
* This approach ensures that no concurrent access happens to same object, while data may flow between
* workers as needed.
*/
/**
* Class representing worker.
*/
@Suppress("NON_PUBLIC_PRIMARY_CONSTRUCTOR_OF_INLINE_CLASS")
public inline class Worker @PublishedApi internal constructor(val id: Int) {
companion object {
/**
* Start new scheduling primitive, such as thread, to accept new tasks via `execute` interface.
* Typically new worker may be needed for computations offload to another core, for IO it may be
* better to use non-blocking IO combined with more lightweight coroutines.
*
* @param errorReporting controls if an uncaught exceptions in the worker will be printed out
*/
public fun start(errorReporting: Boolean = true): Worker = Worker(startInternal(errorReporting))
/**
* Return the current worker, if known, null otherwise. null value will be returned in the main thread
* or platform thread without an associated worker, non-null - if called inside worker started with
* [Worker.start].
*/
public val current: Worker? get() {
val id = currentInternal()
return if (id != 0) Worker(id) else null
}
/**
* Create worker object from a C pointer.
*
* @param pointer value returned earlier by [Worker.asCPointer]
*/
public fun fromCPointer(pointer: COpaquePointer?) =
if (pointer != null) Worker(pointer.toLong().toInt()) else throw IllegalArgumentException()
}
/**
* Requests termination of the worker.
*
* @param processScheduledJobs controls is we shall wait until all scheduled jobs processed,
* or terminate immediately. If there are jobs to be execucted with [executeAfter] their execution
* is awaited for.
*/
public fun requestTermination(processScheduledJobs: Boolean = true) =
Future<Unit>(requestTerminationInternal(id, processScheduledJobs))
/**
* Plan job for further execution in the worker. Execute is a two-phase operation:
* - first [producer] function is executed, and resulting object and whatever it refers to
* is analyzed for being an isolated object subgraph, if in checked mode.
* - Afterwards, this disconnected object graph and [job] function pointer is being added to jobs queue
* of the selected worker. Note that [job] must not capture any state itself, so that whole state is
* explicitly stored in object produced by [producer]. Scheduled job is being executed by the worker,
* and result of such a execution is being disconnected from worker's object graph. Whoever will consume
* the future, can use result of worker's computations.
*
* @return the future with the computation result of [job]
*/
@Suppress("UNUSED_PARAMETER")
@TypedIntrinsic(IntrinsicType.WORKER_EXECUTE)
public fun <T1, T2> execute(mode: TransferMode, producer: () -> T1, @VolatileLambda job: (T1) -> T2): Future<T2> =
/*
* This function is a magical operation, handled by lowering in the compiler, and replaced with call to
* executeImpl(worker, mode, producer, job)
* but first ensuring that `job` parameter doesn't capture any state.
*/
throw RuntimeException("Shall not be called directly")
/**
* Plan job for further execution in the worker. [operation] parameter must be either frozen, or execution to be
* planned on the current worker. Otherwise [IllegalStateException] will be thrown.
* [afterMicroseconds] defines after how many microseconds delay execution shall happen, 0 means immediately,
* on negative values [IllegalArgumentException] is thrown.
*/
public fun executeAfter(afterMicroseconds: Long = 0, operation: () -> Unit): Unit {
val current = currentInternal()
if (current != id && !operation.isFrozen) throw IllegalStateException("Job for another worker must be frozen")
if (afterMicroseconds < 0) throw IllegalArgumentException("Timeout parameter must be non-negative")
executeAfterInternal(id, operation, afterMicroseconds)
}
override public fun toString(): String = "worker $id"
/**
* Convert worker to a COpaquePointer value that could be passed via native void* pointer.
* Can be used as an argument of [Worker.fromCPointer].
*/
public fun asCPointer() : COpaquePointer? = id.toLong().toCPointer()
}