Skip to content

Commit

Permalink
[IO] Don't wait until the event loop thread starts
Browse files Browse the repository at this point in the history
Because we now initialize all (public) event loop communication in the
requesting thread (rather than in the started thread itself), we don't
need to wait for it to be setup correctly.
  • Loading branch information
bdw committed Sep 15, 2018
1 parent d99fef9 commit 907875a
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 81 deletions.
3 changes: 1 addition & 2 deletions src/core/instance.h
Expand Up @@ -331,10 +331,9 @@ struct MVMInstance {
* queue of tasks that need to be processed by the event loop thread
* and an array of active tasks, for the purpose of keeping them GC
* marked. */
MVMThreadContext *event_loop_thread;
MVMObject *event_loop_thread;
uv_loop_t *event_loop;
uv_mutex_t mutex_event_loop;
uv_sem_t sem_event_loop_started;
MVMObject *event_loop_todo_queue;
MVMObject *event_loop_permit_queue;
MVMObject *event_loop_cancel_queue;
Expand Down
6 changes: 4 additions & 2 deletions src/instrument/crossthreadwrite.c
Expand Up @@ -149,9 +149,11 @@ static MVMint64 filtered_out(MVMThreadContext *tc, MVMObject *written) {
return 1;

/* Write on object from event loop thread is usually shift of invokable. */
if (tc->instance->event_loop_thread)
if (written->header.owner == tc->instance->event_loop_thread->thread_id)
if (tc->instance->event_loop_thread) {
MVMThread *thread = (MVMThread*)tc->instance->event_loop_thread;
if (thread != NULL && written->header.owner == thread->body.tc->thread_id)
return 1;
}

/* Filter out writes to Sub and Method, since these are almost always just
* multi-dispatch caches. */
Expand Down
136 changes: 59 additions & 77 deletions src/io/eventloop.c
Expand Up @@ -91,95 +91,78 @@ static void enter_loop(MVMThreadContext *tc, MVMCallsite *callsite, MVMRegister
/* Bind the thread context for the wakeup signal */
async->data = tc;


/* Signal that the event loop is ready for processing. */
uv_sem_post(&(tc->instance->sem_event_loop_started));

/* Enter event loop */
uv_run(loop, UV_RUN_DEFAULT);

/* Allow the cleanup of threadcontext to process the (redundant) loop */
tc->loop = tc->loop;
}

/* Sees if we have an event loop processing thread set up already, and
* sets it up if not. */
static uv_loop_t *get_or_vivify_loop(MVMThreadContext *tc) {
void MVM_io_event_loop_start(MVMThreadContext *tc) {
MVMInstance *instance = tc->instance;
MVMObject *loop_runner;
unsigned int interval_id;
int r;

if (!instance->event_loop_thread) {
/* Grab starting mutex and ensure we didn't lose the race. */
MVM_telemetry_timestamp(tc, "hoping to start an event loop thread");
MVM_gc_mark_thread_blocked(tc);
uv_mutex_lock(&instance->mutex_event_loop);
MVM_gc_mark_thread_unblocked(tc);
if (!instance->event_loop_thread) {
MVMObject *thread, *loop_runner;
int r;
unsigned int interval_id;

interval_id = MVM_telemetry_interval_start(tc, "creating the event loop thread");

/* Create various bits of state the async event loop thread needs. */
instance->event_loop_todo_queue = MVM_repr_alloc_init(tc,
instance->boot_types.BOOTQueue);
instance->event_loop_permit_queue = MVM_repr_alloc_init(tc,
instance->boot_types.BOOTQueue);
instance->event_loop_cancel_queue = MVM_repr_alloc_init(tc,
instance->boot_types.BOOTQueue);
instance->event_loop_active = MVM_repr_alloc_init(tc,
instance->boot_types.BOOTArray);

/* We need to wait until we know the event loop has started; we'll
* use a semaphore for this purpose. */
if ((r = uv_sem_init(&(instance->sem_event_loop_started), 0)) < 0) {
uv_mutex_unlock(&instance->mutex_event_loop);
MVM_exception_throw_adhoc(tc, "Failed to initialize event loop start semaphore: %s",
uv_strerror(r));
}
if (instance->event_loop_thread)
return;

/* The underlying loop structure that will handle all IO events. */
instance->event_loop = MVM_malloc(sizeof(uv_loop_t));
if (uv_loop_init(instance->event_loop) < 0)
MVM_panic(1, "Unable to initialize event loop");

/* The async signal handler for waking up the thread */
instance->event_loop_wakeup = MVM_malloc(sizeof(uv_async_t));
if (uv_async_init(instance->event_loop, instance->event_loop_wakeup, async_handler) != 0)
MVM_panic(1, "Unable to initialize async wake-up handle for event loop");

/* Start the event loop thread, which will call a C function that
* sits in the uv loop, never leaving until it is stopped from the
* outside */
loop_runner = MVM_repr_alloc_init(tc, instance->boot_types.BOOTCCode);
((MVMCFunction *)loop_runner)->body.func = enter_loop;
thread = MVM_thread_new(tc, loop_runner, 1);

MVMROOT(tc, thread, {
MVM_thread_run(tc, thread);

/* Block until we know it's fully started and initialized
* (not sure this is necesary anymore) */
MVM_gc_mark_thread_blocked(tc);
uv_sem_wait(&(instance->sem_event_loop_started));
MVM_gc_mark_thread_unblocked(tc);
uv_sem_destroy(&(instance->sem_event_loop_started));

/* Make the started event loop thread visible to others. */
instance->event_loop_thread = ((MVMThread *)thread)->body.tc;
});
/* Grab starting mutex and ensure we didn't lose the race. */
MVM_telemetry_timestamp(tc, "hoping to start an event loop thread");
MVM_gc_mark_thread_blocked(tc);
uv_mutex_lock(&instance->mutex_event_loop);
MVM_gc_mark_thread_unblocked(tc);

MVM_telemetry_interval_stop(tc, interval_id, "created the event loop thread");
}
uv_mutex_unlock(&instance->mutex_event_loop);
}
/* We may have lost the race, in which case we don't do anything */
if (instance->event_loop_thread)
goto unlock;

interval_id = MVM_telemetry_interval_start(tc, "creating the event loop thread");

return instance->event_loop;
/* Create various bits of state the async event loop thread needs. */
instance->event_loop_todo_queue = MVM_repr_alloc_init(tc,
instance->boot_types.BOOTQueue);
instance->event_loop_permit_queue = MVM_repr_alloc_init(tc,
instance->boot_types.BOOTQueue);
instance->event_loop_cancel_queue = MVM_repr_alloc_init(tc,
instance->boot_types.BOOTQueue);
instance->event_loop_active = MVM_repr_alloc_init(tc,
instance->boot_types.BOOTArray);

/* The underlying loop structure that will handle all IO events. */
instance->event_loop = MVM_malloc(sizeof(uv_loop_t));
if (uv_loop_init(instance->event_loop) < 0)
MVM_panic(1, "Unable to initialize event loop");

/* The async signal handler for waking up the thread */
instance->event_loop_wakeup = MVM_malloc(sizeof(uv_async_t));
if (uv_async_init(instance->event_loop, instance->event_loop_wakeup, async_handler) != 0)
MVM_panic(1, "Unable to initialize async wake-up handle for event loop");

/* Start the event loop thread, which will call a C function that
* sits in the uv loop, never leaving until it is stopped from the
* outside */
loop_runner = MVM_repr_alloc_init(tc, instance->boot_types.BOOTCCode);
((MVMCFunction *)loop_runner)->body.func = enter_loop;

instance->event_loop_thread = MVM_thread_new(tc, loop_runner, 1);

MVM_thread_run(tc, instance->event_loop_thread);

MVM_telemetry_interval_stop(tc, interval_id, "created the event loop thread");

unlock:
uv_mutex_unlock(&instance->mutex_event_loop);
}



/* Adds a work item into the event loop work queue. */
void MVM_io_eventloop_queue_work(MVMThreadContext *tc, MVMObject *work) {
MVMROOT(tc, work, {
get_or_vivify_loop(tc);
MVM_io_event_loop_start(tc);
MVM_repr_push_o(tc, tc->instance->event_loop_todo_queue, work);
uv_async_send(tc->instance->event_loop_wakeup);
});
Expand All @@ -203,7 +186,7 @@ void MVM_io_eventloop_permit(MVMThreadContext *tc, MVMObject *task_obj,
MVM_repr_push_o(tc, arr, task_obj);
MVM_repr_push_o(tc, arr, channel_box);
MVM_repr_push_o(tc, arr, permits_box);
get_or_vivify_loop(tc);
MVM_io_event_loop_start(tc);
MVM_repr_push_o(tc, tc->instance->event_loop_permit_queue, arr);
uv_async_send(tc->instance->event_loop_wakeup);
});
Expand All @@ -226,7 +209,7 @@ void MVM_io_eventloop_cancel_work(MVMThreadContext *tc, MVMObject *task_obj,
notify_schedulee);
}
MVMROOT(tc, task_obj, {
get_or_vivify_loop(tc);
MVM_io_event_loop_start(tc);
MVM_repr_push_o(tc, tc->instance->event_loop_cancel_queue, task_obj);
uv_async_send(tc->instance->event_loop_wakeup);
});
Expand Down Expand Up @@ -285,21 +268,20 @@ void MVM_io_eventloop_remove_active_work(MVMThreadContext *tc, int *work_idx_to_
/* Stop active event loop if present */
void MVM_io_eventloop_stop(MVMThreadContext *tc) {
MVMInstance *instance = tc->instance;
MVMThreadContext *event_loop_thread = instance->event_loop_thread;
MVMThread *event_loop_thread = (MVMThread*)instance->event_loop_thread;
if (!event_loop_thread)
return;

MVM_gc_mark_thread_blocked(tc);
uv_mutex_lock(&instance->mutex_event_loop);
MVM_gc_mark_thread_unblocked(tc);

event_loop_thread = instance->event_loop_thread;
if (event_loop_thread) {
if (instance->event_loop_thread) {
/* Stop the loop */
uv_stop(instance->event_loop);
uv_async_send(instance->event_loop_wakeup);

MVM_thread_join(tc, (MVMObject*)event_loop_thread->thread_obj);
MVM_thread_join(tc, instance->event_loop_thread);

/* release allocated resources */
uv_close((uv_handle_t*)instance->event_loop_wakeup, NULL);
Expand Down
2 changes: 2 additions & 0 deletions src/io/eventloop.h
Expand Up @@ -28,4 +28,6 @@ void MVM_io_eventloop_send_cancellation_notification(MVMThreadContext *tc, MVMAs
int MVM_io_eventloop_add_active_work(MVMThreadContext *tc, MVMObject *async_task);
MVMAsyncTask * MVM_io_eventloop_get_active_work(MVMThreadContext *tc, int work_idx);
void MVM_io_eventloop_remove_active_work(MVMThreadContext *tc, int *work_idx_to_clear);

void MVM_io_eventloop_start(MVMThreadContext *tc);
void MVM_io_eventloop_stop(MVMThreadContext *tc);

0 comments on commit 907875a

Please sign in to comment.