Skip to content

Commit

Permalink
Merge pull request #506 from MoarVM/inactivate-async-tasks
Browse files Browse the repository at this point in the history
Inactivate async tasks
  • Loading branch information
jnthn committed Jan 21, 2017
2 parents 5971948 + 2b01549 commit 8193e8e
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 51 deletions.
38 changes: 19 additions & 19 deletions src/io/asyncsocket.c
Expand Up @@ -36,8 +36,7 @@ static void on_read(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) {
ReadInfo *ri = (ReadInfo *)handle->data;
MVMThreadContext *tc = ri->tc;
MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
MVMAsyncTask *t = (MVMAsyncTask *)MVM_repr_at_pos_o(tc,
tc->instance->event_loop_active, ri->work_idx);
MVMAsyncTask *t = MVM_io_eventloop_get_active_work(tc, ri->work_idx);
MVM_repr_push_o(tc, arr, t->body.schedulee);
if (nread >= 0) {
MVMROOT(tc, t, {
Expand Down Expand Up @@ -83,6 +82,7 @@ static void on_read(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) {
if (buf->base)
MVM_free(buf->base);
uv_read_stop(handle);
MVM_io_eventloop_remove_active_work(tc, &(ri->work_idx));
}
else {
MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
Expand All @@ -99,6 +99,7 @@ static void on_read(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) {
if (buf->base)
MVM_free(buf->base);
uv_read_stop(handle);
MVM_io_eventloop_remove_active_work(tc, &(ri->work_idx));
}
MVM_repr_push_o(tc, t->body.queue, arr);
}
Expand All @@ -111,8 +112,7 @@ static void read_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_t
/* Add to work in progress. */
ReadInfo *ri = (ReadInfo *)data;
ri->tc = tc;
ri->work_idx = MVM_repr_elems(tc, tc->instance->event_loop_active);
MVM_repr_push_o(tc, tc->instance->event_loop_active, async_task);
ri->work_idx = MVM_io_eventloop_add_active_work(tc, async_task);

/* Start reading the stream. */
handle_data = (MVMIOAsyncSocketData *)ri->handle->body.data;
Expand All @@ -134,6 +134,7 @@ static void read_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_t
});
MVM_repr_push_o(tc, t->body.queue, arr);
});
MVM_io_eventloop_remove_active_work(tc, &(ri->work_idx));
}
}

Expand Down Expand Up @@ -262,8 +263,7 @@ static void on_write(uv_write_t *req, int status) {
WriteInfo *wi = (WriteInfo *)req->data;
MVMThreadContext *tc = wi->tc;
MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
MVMAsyncTask *t = (MVMAsyncTask *)MVM_repr_at_pos_o(tc,
tc->instance->event_loop_active, wi->work_idx);
MVMAsyncTask *t = MVM_io_eventloop_get_active_work(tc, wi->work_idx);
MVM_repr_push_o(tc, arr, t->body.schedulee);
if (status >= 0) {
MVMROOT(tc, arr, {
Expand Down Expand Up @@ -292,6 +292,7 @@ static void on_write(uv_write_t *req, int status) {
if (wi->str_data)
MVM_free(wi->buf.base);
MVM_free(wi->req);
MVM_io_eventloop_remove_active_work(tc, &(wi->work_idx));
}

/* Does setup work for an asynchronous write. */
Expand All @@ -303,8 +304,7 @@ static void write_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_
/* Add to work in progress. */
WriteInfo *wi = (WriteInfo *)data;
wi->tc = tc;
wi->work_idx = MVM_repr_elems(tc, tc->instance->event_loop_active);
MVM_repr_push_o(tc, tc->instance->event_loop_active, async_task);
wi->work_idx = MVM_io_eventloop_add_active_work(tc, async_task);

/* Encode the string, or extract buf data. */
if (wi->str_data) {
Expand Down Expand Up @@ -347,6 +347,7 @@ static void write_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_
/* Cleanup handle. */
MVM_free(wi->req);
wi->req = NULL;
MVM_io_eventloop_remove_active_work(tc, &(wi->work_idx));
}
}

Expand Down Expand Up @@ -526,13 +527,12 @@ typedef struct {
int work_idx;
} ConnectInfo;

/* When a connection takes place, need to keep the appropriate promise. */
/* When a connection takes place, need to send result. */
static void on_connect(uv_connect_t* req, int status) {
ConnectInfo *ci = (ConnectInfo *)req->data;
MVMThreadContext *tc = ci->tc;
MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
MVMAsyncTask *t = (MVMAsyncTask *)MVM_repr_at_pos_o(tc,
tc->instance->event_loop_active, ci->work_idx);
MVMAsyncTask *t = MVM_io_eventloop_get_active_work(tc, ci->work_idx);
MVM_repr_push_o(tc, arr, t->body.schedulee);
if (status >= 0) {
/* Allocate and set up handle. */
Expand Down Expand Up @@ -562,6 +562,7 @@ static void on_connect(uv_connect_t* req, int status) {
}
MVM_repr_push_o(tc, t->body.queue, arr);
MVM_free(req);
MVM_io_eventloop_remove_active_work(tc, &(ci->work_idx));
}

/* Initilalize the connection on the event loop. */
Expand All @@ -571,8 +572,7 @@ static void connect_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *asyn
/* Add to work in progress. */
ConnectInfo *ci = (ConnectInfo *)data;
ci->tc = tc;
ci->work_idx = MVM_repr_elems(tc, tc->instance->event_loop_active);
MVM_repr_push_o(tc, tc->instance->event_loop_active, async_task);
ci->work_idx = MVM_io_eventloop_add_active_work(tc, async_task);

/* Create and initialize socket and connection. */
ci->socket = MVM_malloc(sizeof(uv_tcp_t));
Expand Down Expand Up @@ -601,6 +601,7 @@ static void connect_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *asyn
ci->connect = NULL;
uv_close((uv_handle_t *)ci->socket, free_on_close_cb);
ci->socket = NULL;
MVM_io_eventloop_remove_active_work(tc, &(ci->work_idx));
}
}

Expand Down Expand Up @@ -675,8 +676,7 @@ static void on_connection(uv_stream_t *server, int status) {
ListenInfo *li = (ListenInfo *)server->data;
MVMThreadContext *tc = li->tc;
MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
MVMAsyncTask *t = (MVMAsyncTask *)MVM_repr_at_pos_o(tc,
tc->instance->event_loop_active, li->work_idx);
MVMAsyncTask *t = MVM_io_eventloop_get_active_work(tc, li->work_idx);

uv_tcp_t *client = MVM_malloc(sizeof(uv_tcp_t));
int r;
Expand Down Expand Up @@ -721,8 +721,7 @@ static void listen_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async
/* Add to work in progress. */
ListenInfo *li = (ListenInfo *)data;
li->tc = tc;
li->work_idx = MVM_repr_elems(tc, tc->instance->event_loop_active);
MVM_repr_push_o(tc, tc->instance->event_loop_active, async_task);
li->work_idx = MVM_io_eventloop_add_active_work(tc, async_task);

/* Create and initialize socket and connection, and start listening. */
li->socket = MVM_malloc(sizeof(uv_tcp_t));
Expand All @@ -747,6 +746,7 @@ static void listen_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async
});
uv_close((uv_handle_t *)li->socket, free_on_close_cb);
li->socket = NULL;
MVM_io_eventloop_remove_active_work(tc, &(li->work_idx));
return;
}
}
Expand All @@ -756,8 +756,8 @@ static void on_listen_cancelled(uv_handle_t *handle) {
ListenInfo *li = (ListenInfo *)handle->data;
MVMThreadContext *tc = li->tc;
MVM_io_eventloop_send_cancellation_notification(tc,
(MVMAsyncTask *)MVM_repr_at_pos_o(tc, tc->instance->event_loop_active,
li->work_idx));
MVM_io_eventloop_get_active_work(tc, li->work_idx));
MVM_io_eventloop_remove_active_work(tc, &(li->work_idx));
}
static void listen_cancel(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
ListenInfo *li = (ListenInfo *)data;
Expand Down
16 changes: 8 additions & 8 deletions src/io/asyncsocketudp.c
Expand Up @@ -39,8 +39,7 @@ static void on_read(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const
ReadInfo *ri = (ReadInfo *)handle->data;
MVMThreadContext *tc = ri->tc;
MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
MVMAsyncTask *t = (MVMAsyncTask *)MVM_repr_at_pos_o(tc,
tc->instance->event_loop_active, ri->work_idx);
MVMAsyncTask *t = MVM_io_eventloop_get_active_work(tc, ri->work_idx);
MVM_repr_push_o(tc, arr, t->body.schedulee);
if (nread >= 0) {
MVMROOT(tc, t, {
Expand Down Expand Up @@ -86,6 +85,7 @@ static void on_read(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const
if (buf->base)
MVM_free(buf->base);
uv_udp_recv_stop(handle);
MVM_io_eventloop_remove_active_work(tc, &(ri->work_idx));
}
else {
MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
Expand All @@ -102,6 +102,7 @@ static void on_read(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const
if (buf->base)
MVM_free(buf->base);
uv_udp_recv_stop(handle);
MVM_io_eventloop_remove_active_work(tc, &(ri->work_idx));
}
MVM_repr_push_o(tc, t->body.queue, arr);
}
Expand All @@ -114,8 +115,7 @@ static void read_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_t
/* Add to work in progress. */
ReadInfo *ri = (ReadInfo *)data;
ri->tc = tc;
ri->work_idx = MVM_repr_elems(tc, tc->instance->event_loop_active);
MVM_repr_push_o(tc, tc->instance->event_loop_active, async_task);
ri->work_idx = MVM_io_eventloop_add_active_work(tc, async_task);

/* Start reading the stream. */
handle_data = (MVMIOAsyncUDPSocketData *)ri->handle->body.data;
Expand Down Expand Up @@ -266,8 +266,7 @@ static void on_write(uv_udp_send_t *req, int status) {
WriteInfo *wi = (WriteInfo *)req->data;
MVMThreadContext *tc = wi->tc;
MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
MVMAsyncTask *t = (MVMAsyncTask *)MVM_repr_at_pos_o(tc,
tc->instance->event_loop_active, wi->work_idx);
MVMAsyncTask *t = MVM_io_eventloop_get_active_work(tc, wi->work_idx);
MVM_repr_push_o(tc, arr, t->body.schedulee);
if (status >= 0) {
MVMROOT(tc, arr, {
Expand Down Expand Up @@ -296,6 +295,7 @@ static void on_write(uv_udp_send_t *req, int status) {
if (wi->str_data)
MVM_free(wi->buf.base);
MVM_free(wi->req);
MVM_io_eventloop_remove_active_work(tc, &(wi->work_idx));
}

/* Does setup work for an asynchronous write. */
Expand All @@ -307,8 +307,7 @@ static void write_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_
/* Add to work in progress. */
WriteInfo *wi = (WriteInfo *)data;
wi->tc = tc;
wi->work_idx = MVM_repr_elems(tc, tc->instance->event_loop_active);
MVM_repr_push_o(tc, tc->instance->event_loop_active, async_task);
wi->work_idx = MVM_io_eventloop_add_active_work(tc, async_task);

/* Encode the string, or extract buf data. */
if (wi->str_data) {
Expand Down Expand Up @@ -351,6 +350,7 @@ static void write_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_
/* Cleanup handle. */
MVM_free(wi->req);
wi->req = NULL;
MVM_io_eventloop_remove_active_work(tc, &(wi->work_idx));
}
}

Expand Down
35 changes: 35 additions & 0 deletions src/io/eventloop.c
Expand Up @@ -153,3 +153,38 @@ void MVM_io_eventloop_send_cancellation_notification(MVMThreadContext *tc, MVMAs
if (notify_queue && notify_schedulee)
MVM_repr_push_o(tc, notify_queue, notify_schedulee);
}

/* Adds a work item to the active async task set. */
int MVM_io_eventloop_add_active_work(MVMThreadContext *tc, MVMObject *async_task) {
int work_idx = MVM_repr_elems(tc, tc->instance->event_loop_active);
MVM_repr_push_o(tc, tc->instance->event_loop_active, async_task);
return work_idx;
}

/* Gets an active work item from the active work eventloop. */
MVMAsyncTask * MVM_io_eventloop_get_active_work(MVMThreadContext *tc, int work_idx) {
if (work_idx >= 0 && work_idx < MVM_repr_elems(tc, tc->instance->event_loop_active)) {
MVMObject *task_obj = MVM_repr_at_pos_o(tc, tc->instance->event_loop_active, work_idx);
if (REPR(task_obj)->ID != MVM_REPR_ID_MVMAsyncTask)
MVM_panic(1, "non-AsyncTask fetched from eventloop active work list");
return (MVMAsyncTask *)task_obj;
}
else {
MVM_panic(1, "use of invalid eventloop work item index %d", work_idx);
}
}

/* Removes an active work index from the active work list, enabling any
* memory associated with it to be collected. Replaces the work index with -1
* so that any future use of the task will be a failed lookup. */
void MVM_io_eventloop_remove_active_work(MVMThreadContext *tc, int *work_idx_to_clear) {
int work_idx = *work_idx_to_clear;
if (work_idx >= 0 && work_idx < MVM_repr_elems(tc, tc->instance->event_loop_active)) {
*work_idx_to_clear = -1;
MVM_repr_bind_pos_o(tc, tc->instance->event_loop_active, work_idx, tc->instance->VMNull);
/* TODO: start to re-use the indices */
}
else {
MVM_panic(1, "cannot remove invalid eventloop work item index %d", work_idx);
}
}
4 changes: 4 additions & 0 deletions src/io/eventloop.h
Expand Up @@ -18,3 +18,7 @@ void MVM_io_eventloop_queue_work(MVMThreadContext *tc, MVMObject *work);
void MVM_io_eventloop_cancel_work(MVMThreadContext *tc, MVMObject *task_obj,
MVMObject *notify_queue, MVMObject *notify_schedulee);
void MVM_io_eventloop_send_cancellation_notification(MVMThreadContext *tc, MVMAsyncTask *task_obj);

int MVM_io_eventloop_add_active_work(MVMThreadContext *tc, MVMObject *async_task);
MVMAsyncTask * MVM_io_eventloop_get_active_work(MVMThreadContext *tc, int work_idx);
void MVM_io_eventloop_remove_active_work(MVMThreadContext *tc, int *work_idx_to_clear);
6 changes: 2 additions & 4 deletions src/io/filewatchers.c
Expand Up @@ -12,8 +12,7 @@ static void on_changed(uv_fs_event_t *handle, const char *filename, int events,
WatchInfo *wi = (WatchInfo *)handle->data;
MVMThreadContext *tc = wi->tc;
MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
MVMAsyncTask *t = (MVMAsyncTask *)MVM_repr_at_pos_o(tc,
tc->instance->event_loop_active, wi->work_idx);
MVMAsyncTask *t = MVM_io_eventloop_get_active_work(tc, wi->work_idx);
MVM_repr_push_o(tc, arr, t->body.schedulee);
MVMROOT(tc, t, {
MVMROOT(tc, arr, {
Expand Down Expand Up @@ -46,10 +45,9 @@ static void setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task,
int r;

/* Add task to active list. */
wi->work_idx = MVM_repr_elems(tc, tc->instance->event_loop_active);
wi->work_idx = MVM_io_eventloop_add_active_work(tc, async_task);
wi->tc = tc;
wi->handle.data = wi;
MVM_repr_push_o(tc, tc->instance->event_loop_active, async_task);

/* Start watching. */
uv_fs_event_init(loop, &wi->handle);
Expand Down

0 comments on commit 8193e8e

Please sign in to comment.