Skip to content

Commit c311ef5

Browse files
committed
Add a libuv-managed event loop thread.
This will be used in timers, async I/O, file system notifications, and so forth. It will deliver completion notifications into a queue - one that is presumably being used as a thread pool input stream.
1 parent 031b779 commit c311ef5

File tree

14 files changed

+235
-9
lines changed

14 files changed

+235
-9
lines changed

build/Makefile.in

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ OBJECTS = src/core/callsite@obj@ \
8282
src/gc/worklist@obj@ \
8383
src/gc/roots@obj@ \
8484
src/io/io@obj@ \
85+
src/io/eventloop@obj@ \
8586
src/io/syncfile@obj@ \
8687
src/io/syncstream@obj@ \
8788
src/io/syncpipe@obj@ \
@@ -132,6 +133,7 @@ OBJECTS = src/core/callsite@obj@ \
132133
src/6model/reprs/ConditionVariable@obj@ \
133134
src/6model/reprs/Semaphore@obj@ \
134135
src/6model/reprs/ConcBlockingQueue@obj@ \
136+
src/6model/reprs/MVMAsyncTask@obj@ \
135137
src/6model/6model@obj@ \
136138
src/6model/bootstrap@obj@ \
137139
src/6model/sc@obj@ \
@@ -186,6 +188,7 @@ HEADERS = src/moar.h \
186188
src/core/continuation.h \
187189
src/core/intcache.h \
188190
src/io/io.h \
191+
src/io/eventloop.h \
189192
src/io/syncfile.h \
190193
src/io/syncstream.h \
191194
src/io/syncpipe.h \
@@ -241,6 +244,7 @@ HEADERS = src/moar.h \
241244
src/6model/reprs/ConditionVariable.h \
242245
src/6model/reprs/Semaphore.h \
243246
src/6model/reprs/ConcBlockingQueue.h \
247+
src/6model/reprs/MVMAsyncTask.h \
244248
src/6model/sc.h \
245249
src/mast/compiler.h \
246250
src/mast/driver.h \

src/6model/bootstrap.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,7 @@ void MVM_6model_bootstrap(MVMThreadContext *tc) {
576576

577577
/* Create stub BOOTInt, BOOTNum, BOOTStr, BOOTArray, BOOTHash, BOOTCCode,
578578
* BOOTCode, BOOTThread, BOOTIter, BOOTContext, SCRef, Lexotic,
579-
* CallCapture, BOOTIO and BOOTException types. */
579+
* CallCapture, BOOTIO, BOOTException, and BOOTQueue types. */
580580
#define create_stub_boot_type(tc, reprid, slot, makeboolspec, boolspec) do { \
581581
const MVMREPROps *repr = MVM_repr_get_by_id(tc, reprid); \
582582
MVMObject *type = tc->instance->slot = repr->type_object_for(tc, NULL); \
@@ -608,6 +608,7 @@ void MVM_6model_bootstrap(MVMThreadContext *tc) {
608608
create_stub_boot_type(tc, MVM_REPR_ID_MVMMultiCache, boot_types.BOOTMultiCache, 0, MVM_BOOL_MODE_NOT_TYPE_OBJECT);
609609
create_stub_boot_type(tc, MVM_REPR_ID_MVMContinuation, boot_types.BOOTContinuation, 0, MVM_BOOL_MODE_NOT_TYPE_OBJECT);
610610
create_stub_boot_type(tc, MVM_REPR_ID_MVMThread, Thread, 0, MVM_BOOL_MODE_NOT_TYPE_OBJECT);
611+
create_stub_boot_type(tc, MVM_REPR_ID_ConcBlockingQueue, boot_types.BOOTQueue, 0, MVM_BOOL_MODE_NOT_TYPE_OBJECT);
611612

612613
/* Bootstrap the KnowHOW type, giving it a meta-object. */
613614
bootstrap_KnowHOW(tc);
@@ -638,6 +639,7 @@ void MVM_6model_bootstrap(MVMThreadContext *tc) {
638639
meta_objectifier(tc, boot_types.BOOTMultiCache, "BOOTMultiCache");
639640
meta_objectifier(tc, boot_types.BOOTContinuation, "BOOTContinuation");
640641
meta_objectifier(tc, Thread, "Thread");
642+
meta_objectifier(tc, boot_types.BOOTQueue, "BOOTQueue");
641643

642644
/* Create the KnowHOWAttribute type. */
643645
create_KnowHOWAttribute(tc);

src/6model/reprs.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ void MVM_repr_initialize_registry(MVMThreadContext *tc) {
215215
register_core_repr(ConditionVariable);
216216
register_core_repr(Semaphore);
217217
register_core_repr(ConcBlockingQueue);
218+
register_core_repr(AsyncTask);
218219

219220
tc->instance->num_reprs = MVM_REPR_CORE_COUNT;
220221
}

src/6model/reprs.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#include "6model/reprs/ConditionVariable.h"
3737
#include "6model/reprs/Semaphore.h"
3838
#include "6model/reprs/ConcBlockingQueue.h"
39+
#include "6model/reprs/MVMAsyncTask.h"
3940

4041
/* REPR related functions. */
4142
void MVM_repr_initialize_registry(MVMThreadContext *tc);
@@ -82,8 +83,9 @@ const MVMREPROps * MVM_repr_get_by_name(MVMThreadContext *tc, MVMString *name);
8283
#define MVM_REPR_ID_ConditionVariable 34
8384
#define MVM_REPR_ID_Semaphore 35
8485
#define MVM_REPR_ID_ConcBlockingQueue 36
86+
#define MVM_REPR_ID_MVMAsyncTask 37
8587

86-
#define MVM_REPR_CORE_COUNT 37
88+
#define MVM_REPR_CORE_COUNT 38
8789
#define MVM_REPR_MAX_COUNT 64
8890

8991
/* Default attribute functions for a REPR that lacks them. */

src/6model/reprs/MVMAsyncTask.c

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
#include "moar.h"
2+
3+
/* This representation's function pointer table. */
4+
static const MVMREPROps this_repr;
5+
6+
/* Creates a new type object of this representation, and associates it with
7+
* the given HOW. */
8+
static MVMObject * type_object_for(MVMThreadContext *tc, MVMObject *HOW) {
9+
MVMSTable *st = MVM_gc_allocate_stable(tc, &this_repr, HOW);
10+
11+
MVMROOT(tc, st, {
12+
MVMObject *obj = MVM_gc_allocate_type_object(tc, st);
13+
MVM_ASSIGN_REF(tc, &(st->header), st->WHAT, obj);
14+
st->size = sizeof(MVMAsyncTask);
15+
});
16+
17+
return st->WHAT;
18+
}
19+
20+
/* Copies the body of one object to another. */
21+
static void copy_to(MVMThreadContext *tc, MVMSTable *st, void *src, MVMObject *dest_root, void *dest) {
22+
MVM_exception_throw_adhoc(tc, "Cannot copy object with repr MVMAsyncTask");
23+
}
24+
25+
static void deserialize_stable_size(MVMThreadContext *tc, MVMSTable *st, MVMSerializationReader *reader) {
26+
st->size = sizeof(MVMAsyncTask);
27+
}
28+
29+
/* Called by the VM to mark any GCable items. */
30+
static void gc_mark(MVMThreadContext *tc, MVMSTable *st, void *data, MVMGCWorklist *worklist) {
31+
MVMAsyncTaskBody *task = (MVMAsyncTaskBody *)data;
32+
MVM_gc_worklist_add(tc, worklist, &task->queue);
33+
MVM_gc_worklist_add(tc, worklist, &task->schedulee);
34+
if (task->ops && task->ops->gc_mark)
35+
task->ops->gc_mark(tc, task->data, worklist);
36+
}
37+
38+
/* Called by the VM in order to free memory associated with this object. */
39+
static void gc_free(MVMThreadContext *tc, MVMObject *obj) {
40+
MVMAsyncTask *task = (MVMAsyncTask *)obj;
41+
if (task->body.ops && task->body.ops->gc_free)
42+
task->body.ops->gc_free(tc, obj, task->body.data);
43+
}
44+
45+
/* Gets the storage specification for this representation. */
46+
static MVMStorageSpec get_storage_spec(MVMThreadContext *tc, MVMSTable *st) {
47+
MVMStorageSpec spec;
48+
spec.inlineable = MVM_STORAGE_SPEC_REFERENCE;
49+
spec.boxed_primitive = MVM_STORAGE_SPEC_BP_NONE;
50+
spec.can_box = 0;
51+
return spec;
52+
}
53+
54+
/* Compose the representation. */
55+
static void compose(MVMThreadContext *tc, MVMSTable *st, MVMObject *info) {
56+
/* Nothing to do for this REPR. */
57+
}
58+
59+
/* Initializes the representation. */
60+
const MVMREPROps * MVMAsyncTask_initialize(MVMThreadContext *tc) {
61+
return &this_repr;
62+
}
63+
64+
static const MVMREPROps this_repr = {
65+
type_object_for,
66+
MVM_gc_allocate_object,
67+
NULL, /* initialize */
68+
copy_to,
69+
MVM_REPR_DEFAULT_ATTR_FUNCS,
70+
MVM_REPR_DEFAULT_BOX_FUNCS,
71+
MVM_REPR_DEFAULT_POS_FUNCS,
72+
MVM_REPR_DEFAULT_ASS_FUNCS,
73+
MVM_REPR_DEFAULT_ELEMS,
74+
get_storage_spec,
75+
NULL, /* change_type */
76+
NULL, /* serialize */
77+
NULL, /* deserialize */
78+
NULL, /* serialize_repr_data */
79+
NULL, /* deserialize_repr_data */
80+
deserialize_stable_size,
81+
gc_mark,
82+
gc_free,
83+
NULL, /* gc_cleanup */
84+
NULL, /* gc_mark_repr_data */
85+
NULL, /* gc_free_repr_data */
86+
compose,
87+
NULL, /* spesh */
88+
"AsyncTask", /* name */
89+
MVM_REPR_ID_MVMAsyncTask,
90+
0, /* refs_frames */
91+
};

src/6model/reprs/MVMAsyncTask.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/* Representation serving as a handle to an asynchronous task. */
2+
struct MVMAsyncTaskBody {
3+
/* The queue to schedule a result handler on. */
4+
MVMObject *queue;
5+
6+
/* The result handler to schedule. */
7+
MVMObject *schedulee;
8+
9+
/* Async task operation table. */
10+
const MVMAsyncTaskOps *ops;
11+
12+
/* Data stored by operation type. */
13+
void *data;
14+
};
15+
struct MVMAsyncTask {
16+
MVMObject common;
17+
MVMAsyncTaskBody body;
18+
};
19+
20+
/* Function for REPR setup. */
21+
const MVMREPROps * MVMAsyncTask_initialize(MVMThreadContext *tc);

src/core/instance.h

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ struct MVMBootTypes {
2020
MVMObject *BOOTCompUnit;
2121
MVMObject *BOOTMultiCache;
2222
MVMObject *BOOTContinuation;
23+
MVMObject *BOOTQueue;
2324
};
2425

2526
/* Various raw types that don't need a HOW */
@@ -78,9 +79,6 @@ typedef struct _MVMCallsiteProfileData {
7879

7980
/* Represents a MoarVM instance. */
8081
struct MVMInstance {
81-
/* libuv loop */
82-
uv_loop_t *default_loop;
83-
8482
/* The main thread. */
8583
MVMThreadContext *main_thread;
8684

@@ -90,6 +88,15 @@ struct MVMInstance {
9088
/* The number of active user threads. */
9189
MVMuint16 num_user_threads;
9290

91+
/* The event loop thread, a mutex to avoid start-races, a concurrent
92+
* queue of tasks that need to be processed by the event loop thread
93+
* and an array of active tasks, for the purpose of keeping them GC
94+
* marked. */
95+
MVMThreadContext *event_loop_thread;
96+
uv_mutex_t mutex_event_loop_start;
97+
MVMObject *event_loop_todo_queue;
98+
MVMObject *event_loop_active;
99+
93100
/* The KnowHOW meta-object; all other meta-objects (which are
94101
* built in user-space) are built out of this. */
95102
MVMObject *KnowHOW;

src/core/threadcontext.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ MVMThreadContext * MVM_tc_create(MVMInstance *instance) {
3434
tc->frame_pool_table_size = MVMInitialFramePoolTableSize;
3535
tc->frame_pool_table = calloc(MVMInitialFramePoolTableSize, sizeof(MVMFrame *));
3636

37-
tc->loop = instance->default_loop ? uv_loop_new() : uv_default_loop();
37+
/* Use default loop for main thread; create a new one for others. */
38+
tc->loop = instance->main_thread ? uv_loop_new() : uv_default_loop();
3839

3940
/* Initialize random number generator state. */
4041
MVM_proc_seed(tc, (MVM_platform_now() / 10000) * MVM_proc_getpid(tc));

src/gc/roots.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ void MVM_gc_root_add_instance_roots_to_worklist(MVMThreadContext *tc, MVMGCWorkl
4545
MVM_gc_worklist_add(tc, worklist, &tc->instance->compiler_registry);
4646
MVM_gc_worklist_add(tc, worklist, &tc->instance->hll_syms);
4747
MVM_gc_worklist_add(tc, worklist, &tc->instance->clargs);
48+
MVM_gc_worklist_add(tc, worklist, &tc->instance->event_loop_todo_queue);
49+
MVM_gc_worklist_add(tc, worklist, &tc->instance->event_loop_active);
4850

4951
/* okay, so this makes the weak hash slightly less weak.. for certain
5052
* keys of it anyway... */

src/io/eventloop.c

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
#include "moar.h"
2+
3+
/* Asynchronous I/O, timers, file system notifications and signal handlers
4+
* have their callbacks processed by this event loop. Its job is mostly to
5+
* fire off work, receive the callbacks, and put stuff into the concurrent
6+
* work queue of some scheduler or other. It's backed by a thread that is
7+
* started in the usual way, but never actually ends up in interpreter;
8+
* instead, it enters a libuv event loop "forever", until program exit.
9+
*
10+
* Work is sent to the event loop by
11+
*/
12+
13+
/* Sets up an async task to be done on the loop. */
14+
MVMint64 setup_work(MVMThreadContext *tc) {
15+
MVMObject *task_obj;
16+
MVMint64 setup = 0;
17+
while ((task_obj = MVM_concblockingqueue_poll(tc,
18+
(MVMConcBlockingQueue *)tc->instance->event_loop_todo_queue)) != NULL) {
19+
MVMAsyncTask *task = (MVMAsyncTask *)task_obj;
20+
task->body.ops->setup(tc, tc->loop, task->body.data);
21+
setup = 1;
22+
}
23+
return setup;
24+
}
25+
26+
/* Sees if we have an event loop processing thread set up already, and
27+
* sets it up if not. */
28+
void idle_handler(uv_idle_t *handle, int status) {
29+
MVMThreadContext *tc = (MVMThreadContext *)handle->data;
30+
GC_SYNC_POINT(tc);
31+
setup_work(tc);
32+
}
33+
void enter_loop(MVMThreadContext *tc, MVMCallsite *callsite, MVMRegister *args) {
34+
uv_idle_t idle;
35+
if (uv_idle_init(tc->loop, &idle) != 0)
36+
MVM_panic(1, "Unable to initialize idle worker for event loop");
37+
idle.data = tc;
38+
if (uv_idle_start(&idle, idle_handler) != 0)
39+
MVM_panic(1, "Unable to start idle worker for event loop");
40+
uv_run(tc->loop, UV_RUN_DEFAULT);
41+
MVM_panic(1, "Supposedly unending event loop thread ended");
42+
}
43+
static uv_loop_t *get_or_vivify_loop(MVMThreadContext *tc) {
44+
if (!tc->instance->event_loop_thread) {
45+
/* Grab starting mutex and ensure we didn't lose the race. */
46+
uv_mutex_lock(&tc->instance->mutex_event_loop_start);
47+
if (!tc->instance->event_loop_thread) {
48+
/* Start the event loop thread, which will call a C function that
49+
* sits in the uv loop, never leaving. */
50+
MVMObject *thread, *loop_runner;
51+
loop_runner = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTCCode);
52+
((MVMCFunction *)loop_runner)->body.func = enter_loop;
53+
MVMROOT(tc, loop_runner, {
54+
thread = MVM_thread_new(tc, loop_runner, 1);
55+
MVM_thread_run(tc, thread);
56+
});
57+
tc->instance->event_loop_thread = ((MVMThread *)thread)->body.tc;
58+
tc->instance->event_loop_todo_queue = MVM_repr_alloc_init(tc,
59+
tc->instance->boot_types.BOOTQueue);
60+
tc->instance->event_loop_active = MVM_repr_alloc_init(tc,
61+
tc->instance->boot_types.BOOTArray);
62+
}
63+
uv_mutex_unlock(&tc->instance->mutex_event_loop_start);
64+
}
65+
return tc->instance->event_loop_thread->loop;
66+
}
67+
68+
/* Adds a work item into the event loop work queue. */
69+
void MVM_io_eventloop_queue_work(MVMThreadContext *tc, MVMObject *work) {
70+
MVMROOT(tc, work, {
71+
uv_loop_t *loop = get_or_vivify_loop(tc);
72+
MVM_repr_push_o(tc, tc->instance->event_loop_todo_queue, work);
73+
});
74+
}

0 commit comments

Comments
 (0)