Skip to content
Permalink
Browse files

More fault-tolerance in the Worker API. (#2451)

  • Loading branch information...
olonho committed Dec 12, 2018
1 parent 2d9332b commit 2e7cfa0b1e93dc72e3cecfd48eaa2a93ce0166a3
@@ -10,13 +10,19 @@ import kotlin.test.*
import kotlin.native.concurrent.*

@Test fun runTest() {
val worker = Worker.start()
val worker = Worker.start(false)
val future = worker.execute(TransferMode.SAFE, { "Input" }) {
input -> println(input)
}
future.consume {
result -> println("Got $result")
}

assertFailsWith<IllegalStateException> {
println(worker.execute(TransferMode.SAFE, { null }, { _ -> throw Error("An error") }).result)
}

worker.requestTermination().result

println("OK")
}
@@ -55,10 +55,11 @@ void RUNTIME_NORETURN ThrowNotImplementedError();
// Throws illegal character conversion exception (used in UTF8/UTF16 conversions).
void RUNTIME_NORETURN ThrowIllegalCharacterConversionException();
void RUNTIME_NORETURN ThrowIllegalArgumentException();
void RUNTIME_NORETURN ThrowIllegalStateException();
void RUNTIME_NORETURN ThrowInvalidMutabilityException(KConstRef where);
void RUNTIME_NORETURN ThrowIncorrectDereferenceException();
void RUNTIME_NORETURN ThrowIllegalObjectSharingException(KConstNativePtr typeInfo, KConstNativePtr address);
// Prints out mesage of Throwable.
// Prints out message of Throwable.
void PrintThrowable(KRef);

#ifdef __cplusplus
@@ -28,6 +28,7 @@
#endif

#include "Alloc.h"
#include "Exceptions.h"
#include "KAssert.h"
#include "Memory.h"
#include "Runtime.h"
@@ -49,14 +50,17 @@ enum {
INVALID = 0,
SCHEDULED = 1,
COMPUTED = 2,
CANCELLED = 3
CANCELLED = 3,
THROWN = 4
};

enum {
CHECKED = 0,
UNCHECKED = 1
};

THREAD_LOCAL_VARIABLE KInt g_currentWorkerId = 0;

KNativePtr transfer(KRef object, KInt mode) {
switch (mode) {
case CHECKED:
@@ -102,12 +106,15 @@ class Future {
while (state_ == SCHEDULED) {
pthread_cond_wait(&cond_, &lock_);
}
// TODO: maybe use message from exception?
if (state_ == THROWN)
ThrowIllegalStateException();
auto result = AdoptStablePointer(result_, OBJ_RESULT);
result_ = nullptr;
return result;
}

void storeResultUnlocked(KNativePtr result);
void storeResultUnlocked(KNativePtr result, bool ok);

void cancelUnlocked();

@@ -136,7 +143,7 @@ struct Job {

class Worker {
public:
Worker(KInt id) : id_(id) {
Worker(KInt id, bool errorReporting) : id_(id), errorReporting_(errorReporting) {
pthread_mutex_init(&lock_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
@@ -173,12 +180,15 @@ class Worker {

KInt id() const { return id_; }

bool errorReporting() const { return errorReporting_; }

private:
KInt id_;
KStdDeque<Job> queue_;
// Lock and condition for waiting on the queue.
pthread_mutex_t lock_;
pthread_cond_t cond_;
bool errorReporting_;
};

class State {
@@ -198,9 +208,9 @@ class State {
pthread_cond_destroy(&cond_);
}

Worker* addWorkerUnlocked() {
Worker* addWorkerUnlocked(bool errorReporting) {
Locker locker(&lock_);
Worker* worker = konanConstructInstance<Worker>(nextWorkerId());
Worker* worker = konanConstructInstance<Worker>(nextWorkerId(), errorReporting);
if (worker == nullptr) return nullptr;
workers_[worker->id()] = worker;
return worker;
@@ -332,14 +342,14 @@ State* theState() {
return state;
}

void Future::storeResultUnlocked(KNativePtr result) {
void Future::storeResultUnlocked(KNativePtr result, bool ok) {
{
Locker locker(&lock_);
state_ = COMPUTED;
state_ = ok ? COMPUTED : THROWN;
result_ = result;
// Beware here: although manual clearly says that pthread_cond_signal() could be called outside
// of the taken lock, it's not on OSX (as of 10.13.1). If moved outside of the lock,
// some notifications gets missed.
// of the taken lock, it's not on macOS (as of 10.13.1). If moved outside of the lock,
// some notifications are missing.
pthread_cond_signal(&cond_);
}
theState()->signalAnyFuture();
@@ -361,12 +371,14 @@ extern "C" void ReportUnhandledException(KRef e);
void* workerRoutine(void* argument) {
Worker* worker = reinterpret_cast<Worker*>(argument);

g_currentWorkerId = worker->id();
Kotlin_initRuntimeIfNeeded();

while (true) {
Job job = worker->getJob();
if (job.function == nullptr) {
// Termination request, notify the future.
job.future->storeResultUnlocked(nullptr);
job.future->storeResultUnlocked(nullptr, true);
theState()->removeWorkerUnlocked(worker->id());
break;
}
@@ -377,16 +389,19 @@ void* workerRoutine(void* argument) {
// It is so, as ownership is transferred.
KRef resultRef = nullptr;
KNativePtr result = nullptr;
bool ok = true;
try {
job.function(argument, &resultRef);
argumentHolder.clear();
// Transfer the result.
result = transfer(resultRef, job.transferMode);
} catch (ObjHolder& e) {
ReportUnhandledException(e.obj());
ok = false;
if (worker->errorReporting())
ReportUnhandledException(e.obj());
}
// Notify the future.
job.future->storeResultUnlocked(result);
job.future->storeResultUnlocked(result, ok);
}

Kotlin_deinitRuntimeIfNeeded();
@@ -396,14 +411,18 @@ void* workerRoutine(void* argument) {
return nullptr;
}

KInt startWorker() {
Worker* worker = theState()->addWorkerUnlocked();
KInt startWorker(KBoolean errorReporting) {
Worker* worker = theState()->addWorkerUnlocked(errorReporting != 0);
if (worker == nullptr) return -1;
pthread_t thread = 0;
pthread_create(&thread, nullptr, workerRoutine, worker);
return worker->id();
}

KInt currentWorker() {
return g_currentWorkerId;
}

KInt schedule(KInt id, KInt transferMode, KRef producer, KNativePtr jobFunction) {
Job job;
// Note that this is a bit hacky, as we must not auto-release jobArgumentRef,
@@ -454,7 +473,7 @@ KNativePtr detachObjectGraphInternal(KInt transferMode, KRef producer) {

#else

KInt startWorker() {
KInt startWorker(KBoolean errorReporting) {
ThrowWorkerUnsupported();
return -1;
}
@@ -469,6 +488,11 @@ KInt schedule(KInt id, KInt transferMode, KRef producer, KNativePtr jobFunction)
return 0;
}

KInt currentWorker() {
ThrowWorkerUnsupported();
return 0;
}

OBJ_GETTER(consumeFuture, KInt id) {
ThrowWorkerUnsupported();
RETURN_OBJ(nullptr);
@@ -505,8 +529,12 @@ KNativePtr detachObjectGraphInternal(KInt transferMode, KRef producer) {

extern "C" {

KInt Kotlin_Worker_startInternal() {
return startWorker();
KInt Kotlin_Worker_startInternal(KBoolean noErrorReporting) {
return startWorker(noErrorReporting);
}

KInt Kotlin_Worker_currentInternal() {
return currentWorker();
}

KInt Kotlin_Worker_requestTerminationWorkerInternal(KInt id, KBoolean processScheduledJobs) {
@@ -17,31 +17,40 @@ enum class FutureState(val value: Int) {
/** Future result is computed. */
COMPUTED(2),
/** Future is cancelled. */
CANCELLED(3)
CANCELLED(3),
/** Computation thrown an exception. */
THROWN(4)
}

/**
* Class representing abstract computation, whose result may become available in the future.
*/
@Frozen
public class Future<T> internal constructor(val id: Int) {
@Suppress("NON_PUBLIC_PRIMARY_CONSTRUCTOR_OF_INLINE_CLASS")
public inline class Future<T> @PublishedApi internal constructor(val id: Int) {
/**
* Blocks execution until the future is ready.
*
* @return the execution result of [code] consumed futures's computaiton
* @throws IllegalStateException if current future has [FutureState.INVALID] or [FutureState.CANCELLED] state
* @return the execution result of [code] consumed future's computaiton
* @throws IllegalStateException if future is in [FutureState.INVALID], [FutureState.CANCELLED] or
* [FutureState.THROWN] state
*/
public inline fun <R> consume(code: (T) -> R): R =
when (state) {
FutureState.SCHEDULED, FutureState.COMPUTED -> {
val value = @Suppress("UNCHECKED_CAST", "NON_PUBLIC_CALL_FROM_PUBLIC_INLINE") (consumeFuture(id) as T)
code(value)
}
FutureState.INVALID ->
throw IllegalStateException("Future is in an invalid state: $state")
FutureState.CANCELLED ->
throw IllegalStateException("Future is cancelled")
public inline fun <R> consume(code: (T) -> R): R = when (state) {
FutureState.SCHEDULED, FutureState.COMPUTED -> {
val value = @Suppress("UNCHECKED_CAST", "NON_PUBLIC_CALL_FROM_PUBLIC_INLINE")
(consumeFuture(id) as T)
code(value)
}
FutureState.INVALID ->
throw IllegalStateException("Future is in an invalid state")
FutureState.CANCELLED -> {
consumeFuture(id)
throw IllegalStateException("Future is cancelled")
}
FutureState.THROWN -> {
consumeFuture(id)
throw IllegalStateException("Job has thrown an exception")
}
}

/**
* The result of the future computation.
@@ -56,10 +65,6 @@ public class Future<T> internal constructor(val id: Int) {
public val state: FutureState
get() = FutureState.values()[stateOfFuture(id)]

public override fun equals(other: Any?): Boolean = (other is Future<*>) && (id == other.id)

public override fun hashCode(): Int = id

override public fun toString(): String = "future $id"
}

@@ -30,7 +30,10 @@ internal fun executeImpl(worker: Worker, mode: TransferMode, producer: () -> Any
Future<Any?>(executeInternal(worker.id, mode.value, producer, job))

@SymbolName("Kotlin_Worker_startInternal")
external internal fun startInternal(): Int
external internal fun startInternal(errorReporting: Boolean): Int

@SymbolName("Kotlin_Worker_currentInternal")
external internal fun currentInternal(): Int

@SymbolName("Kotlin_Worker_requestTerminationWorkerInternal")
external internal fun requestTerminationInternal(id: Int, processScheduledJobs: Boolean): Int
@@ -24,15 +24,27 @@ import kotlinx.cinterop.*
/**
* Class representing worker.
*/
@Frozen
public class Worker private constructor(val id: Int) {
@Suppress("NON_PUBLIC_PRIMARY_CONSTRUCTOR_OF_INLINE_CLASS")
public inline class Worker @PublishedApi internal constructor(val id: Int) {
companion object {
/**
* Start new scheduling primitive, such as thread, to accept new tasks via `execute` interface.
* Typically new worker may be needed for computations offload to another core, for IO it may be
* better to use non-blocking IO combined with more lightweight coroutines.
*
* @param errorReporting controls if an uncaught exceptions in the worker will be printed out
*/
public fun start(): Worker = Worker(startInternal())
public fun start(errorReporting: Boolean = true): Worker = Worker(startInternal(errorReporting))

/**
* Return the current worker, if known, null otherwise. null value will be returned in the main thread
* or platform thread without an associated worker, non-null - if called inside worker started with
* [Worker.start].
*/
public val current: Worker? get() {
val id = currentInternal()
return if (id != 0) Worker(id) else null
}
}

/**
@@ -65,9 +77,5 @@ public class Worker private constructor(val id: Int) {
*/
throw RuntimeException("Shall not be called directly")

override public fun equals(other: Any?): Boolean = (other is Worker) && (id == other.id)

override public fun hashCode(): Int = id

override public fun toString(): String = "worker $id"
}
@@ -60,6 +60,11 @@ internal fun ThrowIllegalArgumentException() : Nothing {
throw IllegalArgumentException()
}

@ExportForCppRuntime
internal fun ThrowIllegalStateException() : Nothing {
throw IllegalStateException()
}

@ExportForCppRuntime
internal fun ThrowNotImplementedError(): Nothing {
throw NotImplementedError("An operation is not implemented.")

0 comments on commit 2e7cfa0

Please sign in to comment.
You can’t perform that action at this time.