Skip to content

Commit

Permalink
Sort out semantics of closed async sockets.
Browse files Browse the repository at this point in the history
* Further requests to close will have no effect, so close of an
  async socket is idempotent. Useful when people are doing timeout
  things that race with normal close. Previously could bring down the
  VM or even cause SEGV.
* A write on a closed socket will properly signal the error through
  the callback, NOT bring down the VM now.
* A read on a closed socket will behave like a read that started just
  before the socket was closed - that is, it will signal completion.
  On the Perl 6 side, this will be like a Supply that completes right
  away in both cases. Again, this avoids races. Previously, this could
  fail in all kinds of ways, potentially with SEGV though I didn't
  manage to provoke it.
  • Loading branch information
jnthn committed Feb 9, 2017
1 parent 65972ec commit 2b0739d
Showing 1 changed file with 75 additions and 17 deletions.
92 changes: 75 additions & 17 deletions src/io/asyncsocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,35 @@ static void on_read(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) {
/* Does setup work for setting up asynchronous reads. */
static void read_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
MVMIOAsyncSocketData *handle_data;
ReadInfo *ri;
int r;

/* Ensure not closed. */
ri = (ReadInfo *)data;
handle_data = (MVMIOAsyncSocketData *)ri->handle->body.data;
if (!handle_data->handle || uv_is_closing((uv_handle_t *)handle_data->handle)) {
/* Closed, so immediately send done. */
MVMAsyncTask *t = (MVMAsyncTask *)async_task;
MVMROOT(tc, t, {
MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
MVM_repr_push_o(tc, arr, t->body.schedulee);
MVMROOT(tc, arr, {
MVMObject *final = MVM_repr_box_int(tc,
tc->instance->boot_types.BOOTInt, ri->seq_number);
MVM_repr_push_o(tc, arr, final);
});
MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
MVM_repr_push_o(tc, t->body.queue, arr);
});
return;
}

/* Add to work in progress. */
ReadInfo *ri = (ReadInfo *)data;
ri->tc = tc;
ri->work_idx = MVM_io_eventloop_add_active_work(tc, async_task);

/* Start reading the stream. */
handle_data = (MVMIOAsyncSocketData *)ri->handle->body.data;
handle_data->handle->data = data;
if ((r = uv_read_start(handle_data->handle, on_alloc, on_read)) < 0) {
/* Error; need to notify. */
Expand Down Expand Up @@ -298,13 +318,34 @@ static void on_write(uv_write_t *req, int status) {
/* Does setup work for an asynchronous write. */
static void write_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
MVMIOAsyncSocketData *handle_data;
WriteInfo *wi;
char *output;
int output_size, r;

/* Ensure not closed. */
wi = (WriteInfo *)data;
handle_data = (MVMIOAsyncSocketData *)wi->handle->body.data;
if (!handle_data->handle || uv_is_closing((uv_handle_t *)handle_data->handle)) {
MVMROOT(tc, async_task, {
MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
MVMAsyncTask *t = (MVMAsyncTask *)async_task;
MVM_repr_push_o(tc, arr, t->body.schedulee);
MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
MVMROOT(tc, arr, {
MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
tc->instance->VMString, "Cannot write to a closed socket");
MVMObject *msg_box = MVM_repr_box_str(tc,
tc->instance->boot_types.BOOTStr, msg_str);
MVM_repr_push_o(tc, arr, msg_box);
});
MVM_repr_push_o(tc, t->body.queue, arr);
});
return;
}

/* Add to work in progress. */
WriteInfo *wi = (WriteInfo *)data;
wi->tc = tc;
wi->work_idx = MVM_io_eventloop_add_active_work(tc, async_task);
wi->tc = tc;
wi->work_idx = MVM_io_eventloop_add_active_work(tc, async_task);

/* Encode the string, or extract buf data. */
if (wi->str_data) {
Expand All @@ -322,10 +363,6 @@ static void write_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_
wi->req = MVM_malloc(sizeof(uv_write_t));
wi->buf = uv_buf_init(output, output_size);
wi->req->data = data;
handle_data = (MVMIOAsyncSocketData *)wi->handle->body.data;

if (uv_is_closing((uv_handle_t *)handle_data->handle))
MVM_exception_throw_adhoc(tc, "cannot write to a closed socket");

if ((r = uv_write(wi->req, handle_data->handle, &(wi->buf), 1, on_write)) < 0) {
/* Error; need to notify. */
Expand Down Expand Up @@ -456,34 +493,55 @@ static MVMAsyncTask * write_bytes(MVMThreadContext *tc, MVMOSHandle *h, MVMObjec
return task;
}

/* Info we convey about a socket close task. */
typedef struct {
MVMOSHandle *handle;
} CloseInfo;

/* Does an asynchronous close (since it must run on the event loop). */
static void close_perform(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
uv_handle_t *handle = (uv_handle_t *)data;
CloseInfo *ci = (CloseInfo *)data;
MVMIOAsyncSocketData *handle_data = (MVMIOAsyncSocketData *)ci->handle->body.data;
uv_handle_t *handle = (uv_handle_t *)handle_data->handle;
if (handle && !uv_is_closing(handle)) {
handle_data->handle = NULL;
uv_close(handle, free_on_close_cb);
}
}

if (uv_is_closing(handle))
MVM_exception_throw_adhoc(tc, "cannot close a closed socket");
/* Marks objects for a close task. */
static void close_gc_mark(MVMThreadContext *tc, void *data, MVMGCWorklist *worklist) {
CloseInfo *ci = (CloseInfo *)data;
MVM_gc_worklist_add(tc, worklist, &ci->handle);
}

uv_close(handle, free_on_close_cb);
/* Frees info for a close task. */
static void close_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) {
if (data)
MVM_free(data);
}

/* Operations table for async close task. */
static const MVMAsyncTaskOps close_op_table = {
close_perform,
NULL,
NULL,
NULL
close_gc_mark,
close_gc_free
};

static MVMint64 close_socket(MVMThreadContext *tc, MVMOSHandle *h) {
MVMIOAsyncSocketData *data = (MVMIOAsyncSocketData *)h->body.data;
MVMAsyncTask *task;
CloseInfo *ci;

MVMROOT(tc, h, {
task = (MVMAsyncTask *)MVM_repr_alloc_init(tc,
tc->instance->boot_types.BOOTAsync);
});
task->body.ops = &close_op_table;
task->body.data = data->handle;
task->body.ops = &close_op_table;
ci = MVM_calloc(1, sizeof(CloseInfo));
MVM_ASSIGN_REF(tc, &(task->common.header), ci->handle, h);
task->body.data = ci;
MVM_io_eventloop_queue_work(tc, (MVMObject *)task);

return 0;
Expand Down

0 comments on commit 2b0739d

Please sign in to comment.