diff --git a/src/io/asyncsocket.c b/src/io/asyncsocket.c index a7ba49b109..feb6513089 100644 --- a/src/io/asyncsocket.c +++ b/src/io/asyncsocket.c @@ -8,11 +8,12 @@ typedef struct { /* Info we convey about a read task. */ typedef struct { + MVMThreadContext *tc; + int work_idx; MVMOSHandle *handle; MVMObject *buf_type; int seq_number; - MVMThreadContext *tc; - int work_idx; + int error; } ReadInfo; /* Allocates a buffer of the suggested size. */ @@ -29,48 +30,56 @@ static void free_on_close_cb(uv_handle_t *handle) { /* Read handler. */ static void on_read(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) { - ReadInfo *ri = (ReadInfo *)handle->data; - MVMThreadContext *tc = ri->tc; - MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); - MVMAsyncTask *t = MVM_io_eventloop_get_active_work(tc, ri->work_idx); - MVM_repr_push_o(tc, arr, t->body.schedulee); - if (nread >= 0) { - MVMROOT2(tc, t, arr, { - MVMArray *res_buf; + ReadInfo *ri = (ReadInfo *)handle->data; + MVMThreadContext *tc = ri->tc; + MVMAsyncTask *t = MVM_io_eventloop_get_active_work(tc, ri->work_idx); + MVMObject *arr; - /* Push the sequence number. */ - MVMObject *seq_boxed = MVM_repr_box_int(tc, + if (nread >= 0) { + MVMROOT3(tc, t, ri->handle, ri->buf_type, { + arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); + }); + MVM_repr_push_o(tc, arr, t->body.schedulee); + MVMROOT4(tc, t, arr, ri->handle, ri->buf_type, { + /* Push the sequence number, produce a buffer, and push that as + * well. */ + MVMObject *seq_boxed = MVM_repr_box_int(tc, tc->instance->boot_types.BOOTInt, ri->seq_number++); - MVM_repr_push_o(tc, arr, seq_boxed); - - /* Produce a buffer and push it. */ - res_buf = (MVMArray *)MVM_repr_alloc_init(tc, ri->buf_type); + MVMArray *res_buf = (MVMArray *)MVM_repr_alloc_init(tc, ri->buf_type); res_buf->body.slots.i8 = (MVMint8 *)buf->base; res_buf->body.start = 0; res_buf->body.ssize = buf->len; res_buf->body.elems = nread; + MVM_repr_push_o(tc, arr, seq_boxed); MVM_repr_push_o(tc, arr, (MVMObject *)res_buf); - - /* Finally, no error. */ - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); }); + /* Finally, no error. */ + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); + MVM_repr_push_o(tc, t->body.queue, arr); } else { MVMIOAsyncSocketData *handle_data = (MVMIOAsyncSocketData *)ri->handle->body.data; - uv_handle_t *conn_handle = (uv_handle_t *)handle_data->handle; + uv_handle_t *conn_handle = (uv_handle_t *)handle_data->handle; + + MVMROOT3(tc, t, ri->handle, ri->buf_type, { + arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); + }); + MVM_repr_push_o(tc, arr, t->body.schedulee); if (nread == UV_EOF) { - MVMROOT2(tc, t, arr, { + /* End of file; push read count. */ + MVMROOT4(tc, t, arr, ri->handle, ri->buf_type, { MVMObject *final = MVM_repr_box_int(tc, tc->instance->boot_types.BOOTInt, ri->seq_number); MVM_repr_push_o(tc, arr, final); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); }); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); } else { + /* Error; need to notify. */ MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); - MVMROOT2(tc, t, arr, { + MVMROOT4(tc, t, arr, ri->handle, ri->buf_type, { MVMString *msg_str = MVM_string_ascii_decode_nt(tc, tc->instance->VMString, uv_strerror(nread)); MVMObject *msg_box = MVM_repr_box_str(tc, @@ -78,77 +87,94 @@ static void on_read(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) { MVM_repr_push_o(tc, arr, msg_box); }); } + MVM_repr_push_o(tc, t->body.queue, arr); + + /* Clean up. */ if (buf->base) MVM_free(buf->base); - MVM_io_eventloop_remove_active_work(tc, &(ri->work_idx)); if (conn_handle && !uv_is_closing(conn_handle)) { handle_data->handle = NULL; uv_close(conn_handle, free_on_close_cb); } + + MVM_io_eventloop_remove_active_work(tc, &(ri->work_idx)); } - MVM_repr_push_o(tc, t->body.queue, arr); } /* Does setup work for setting up asynchronous reads. */ static void read_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) { - MVMIOAsyncSocketData *handle_data; - ReadInfo *ri; - int r; + ReadInfo *ri = (ReadInfo *)data; + MVMIOAsyncSocketData *handle_data = (MVMIOAsyncSocketData *)ri->handle->body.data; + uv_handle_t *handle; + MVMAsyncTask *t = (MVMAsyncTask *)async_task; + MVMObject *arr; + + /* Add to work in progress. */ + ri->tc = tc; + ri->work_idx = MVM_io_eventloop_add_active_work(tc, async_task); /* Ensure not closed. */ - ri = (ReadInfo *)data; - handle_data = (MVMIOAsyncSocketData *)ri->handle->body.data; - if (!handle_data->handle || uv_is_closing((uv_handle_t *)handle_data->handle)) { - /* Closed, so immediately send done. */ - MVMAsyncTask *t = (MVMAsyncTask *)async_task; - MVMROOT(tc, t, { - MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); - MVM_repr_push_o(tc, arr, t->body.schedulee); - MVMROOT(tc, arr, { - MVMObject *final = MVM_repr_box_int(tc, - tc->instance->boot_types.BOOTInt, ri->seq_number); - MVM_repr_push_o(tc, arr, final); - }); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); - MVM_repr_push_o(tc, t->body.queue, arr); - }); - return; - } + if (handle_data->handle == NULL) + goto closed; - /* Add to work in progress. */ - ri->tc = tc; - ri->work_idx = MVM_io_eventloop_add_active_work(tc, async_task); + /* Get our handle; set its data so we can access the ReadInfo struct in + * on_read. */ + handle = (uv_handle_t *)handle_data->handle; + handle->data = data; + + /* Ensure not closed. */ + if (uv_is_closing(handle)) + goto closed; /* Start reading the stream. */ - handle_data->handle->data = data; - if ((r = uv_read_start(handle_data->handle, on_alloc, on_read)) < 0) { - /* Error; need to notify. */ - MVMROOT(tc, async_task, { - MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); - MVMAsyncTask *t = (MVMAsyncTask *)async_task; - MVM_repr_push_o(tc, arr, t->body.schedulee); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); - MVMROOT(tc, arr, { - MVMString *msg_str = MVM_string_ascii_decode_nt(tc, - tc->instance->VMString, uv_strerror(r)); - MVMObject *msg_box = MVM_repr_box_str(tc, - tc->instance->boot_types.BOOTStr, msg_str); - MVM_repr_push_o(tc, arr, msg_box); - }); - MVM_repr_push_o(tc, t->body.queue, arr); - }); - MVM_io_eventloop_remove_active_work(tc, &(ri->work_idx)); - } + if ((ri->error = uv_read_start(handle_data->handle, on_alloc, on_read)) == 0) + /* Success; finish up in on_read. */ + return; + + /* Error; need to notify. */ + MVMROOT3(tc, t, ri->handle, ri->buf_type, { + arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); + }); + MVM_repr_push_o(tc, arr, t->body.schedulee); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); + MVMROOT4(tc, t, arr, ri->handle, ri->buf_type, { + MVMString *msg_str = MVM_string_ascii_decode_nt(tc, + tc->instance->VMString, uv_strerror(ri->error)); + MVMObject *msg_box = MVM_repr_box_str(tc, + tc->instance->boot_types.BOOTStr, msg_str); + MVM_repr_push_o(tc, arr, msg_box); + }); + MVM_repr_push_o(tc, t->body.queue, arr); + MVM_io_eventloop_remove_active_work(tc, &(ri->work_idx)); + return; + +closed: + /* Closed, so immediately send done. */ + MVMROOT3(tc, t, ri->handle, ri->buf_type, { + arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); + }); + MVM_repr_push_o(tc, arr, t->body.schedulee); + MVMROOT4(tc, t, arr, ri->handle, ri->buf_type, { + MVMObject *final = MVM_repr_box_int(tc, + tc->instance->boot_types.BOOTInt, ri->seq_number); + MVM_repr_push_o(tc, arr, final); + }); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); + MVM_repr_push_o(tc, t->body.queue, arr); + MVM_io_eventloop_remove_active_work(tc, &(ri->work_idx)); + return; } /* Stops reading. */ static void read_cancel(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) { ReadInfo *ri = (ReadInfo *)data; + if (ri->work_idx >= 0) { MVMIOAsyncSocketData *handle_data = (MVMIOAsyncSocketData *)ri->handle->body.data; - if (handle_data->handle && !uv_is_closing((uv_handle_t *)handle_data->handle)) + uv_handle_t *handle = (uv_handle_t *)handle_data->handle; + if (handle != NULL && !uv_is_closing(handle)) uv_read_stop(handle_data->handle); MVM_io_eventloop_remove_active_work(tc, &(ri->work_idx)); } @@ -163,7 +189,7 @@ static void read_gc_mark(MVMThreadContext *tc, void *data, MVMGCWorklist *workli /* Frees info for a read task. */ static void read_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) { - if (data) + if (data != NULL) MVM_free(data); } @@ -179,7 +205,7 @@ static const MVMAsyncTaskOps read_op_table = { static MVMAsyncTask * read_bytes(MVMThreadContext *tc, MVMOSHandle *h, MVMObject *queue, MVMObject *schedulee, MVMObject *buf_type, MVMObject *async_type) { MVMAsyncTask *task; - ReadInfo *ri; + ReadInfo *ri; /* Validate REPRs. */ if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue) @@ -199,19 +225,20 @@ static MVMAsyncTask * read_bytes(MVMThreadContext *tc, MVMOSHandle *h, MVMObject } /* Create async task handle. */ - MVMROOT4(tc, queue, schedulee, h, buf_type, { + MVMROOT5(tc, h, queue, schedulee, buf_type, async_type, { task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type); }); + MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue); MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee); - task->body.ops = &read_op_table; ri = MVM_calloc(1, sizeof(ReadInfo)); MVM_ASSIGN_REF(tc, &(task->common.header), ri->buf_type, buf_type); MVM_ASSIGN_REF(tc, &(task->common.header), ri->handle, h); task->body.data = ri; + task->body.ops = &read_op_table; /* Hand the task off to the event loop. */ - MVMROOT(tc, task, { + MVMROOT6(tc, h, queue, schedulee, buf_type, async_type, task, { MVM_io_eventloop_queue_work(tc, (MVMObject *)task); }); @@ -220,110 +247,131 @@ static MVMAsyncTask * read_bytes(MVMThreadContext *tc, MVMOSHandle *h, MVMObject /* Info we convey about a write task. */ typedef struct { + MVMThreadContext *tc; + int work_idx; MVMOSHandle *handle; MVMObject *buf_data; uv_write_t *req; uv_buf_t buf; - MVMThreadContext *tc; - int work_idx; + int error; } WriteInfo; /* Completion handler for an asynchronous write. */ static void on_write(uv_write_t *req, int status) { - WriteInfo *wi = (WriteInfo *)req->data; - MVMThreadContext *tc = wi->tc; - MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); - MVMAsyncTask *t = MVM_io_eventloop_get_active_work(tc, wi->work_idx); - MVM_repr_push_o(tc, arr, t->body.schedulee); - if (status >= 0) { - MVMROOT2(tc, arr, t, { - MVMObject *bytes_box = MVM_repr_box_int(tc, - tc->instance->boot_types.BOOTInt, - wi->buf.len); - MVM_repr_push_o(tc, arr, bytes_box); - }); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); - } - else { - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); - MVMROOT2(tc, arr, t, { - MVMString *msg_str = MVM_string_ascii_decode_nt(tc, - tc->instance->VMString, uv_strerror(status)); - MVMObject *msg_box = MVM_repr_box_str(tc, - tc->instance->boot_types.BOOTStr, msg_str); - MVM_repr_push_o(tc, arr, msg_box); - }); - } - MVM_repr_push_o(tc, t->body.queue, arr); - MVM_free(wi->req); - MVM_io_eventloop_remove_active_work(tc, &(wi->work_idx)); -} - -/* Does setup work for an asynchronous write. */ -static void write_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) { - MVMIOAsyncSocketData *handle_data; - MVMArray *buffer; - WriteInfo *wi; - char *output; - int output_size, r; - - /* Ensure not closed. */ - wi = (WriteInfo *)data; - handle_data = (MVMIOAsyncSocketData *)wi->handle->body.data; - if (!handle_data->handle || uv_is_closing((uv_handle_t *)handle_data->handle)) { - MVMROOT(tc, async_task, { - MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); - MVMAsyncTask *t = (MVMAsyncTask *)async_task; - MVM_repr_push_o(tc, arr, t->body.schedulee); + WriteInfo *wi = (WriteInfo *)req->data; + MVMThreadContext *tc = wi->tc; + MVMAsyncTask *t = MVM_io_eventloop_get_active_work(tc, wi->work_idx); + MVMROOT3(tc, t, wi->handle, wi->buf_data, { + MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); + MVM_repr_push_o(tc, arr, t->body.schedulee); + if (status == 0) { + /* Success; push write length. */ + MVMROOT(tc, arr, { + MVMObject *bytes_box = MVM_repr_box_int(tc, + tc->instance->boot_types.BOOTInt, + wi->buf.len); + MVM_repr_push_o(tc, arr, bytes_box); + }); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); + } + else { + /* Error; need to notify. */ MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); MVMROOT(tc, arr, { MVMString *msg_str = MVM_string_ascii_decode_nt(tc, - tc->instance->VMString, "Cannot write to a closed socket"); + tc->instance->VMString, uv_strerror(status)); MVMObject *msg_box = MVM_repr_box_str(tc, tc->instance->boot_types.BOOTStr, msg_str); MVM_repr_push_o(tc, arr, msg_box); }); - MVM_repr_push_o(tc, t->body.queue, arr); - }); - return; - } + } + MVM_repr_push_o(tc, t->body.queue, arr); + }); + MVM_io_eventloop_remove_active_work(tc, &(wi->work_idx)); +} + +/* Does setup work for an asynchronous write. */ +static void write_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) { + WriteInfo *wi = (WriteInfo *)data; + MVMIOAsyncSocketData *handle_data = (MVMIOAsyncSocketData *)wi->handle->body.data; + uv_handle_t *handle; + MVMArray *buffer; + char *output; + int output_size; + MVMAsyncTask *t = (MVMAsyncTask *)async_task; + MVMObject *arr; /* Add to work in progress. */ - wi->tc = tc; + wi->tc = tc; wi->work_idx = MVM_io_eventloop_add_active_work(tc, async_task); /* Extract buf data. */ - buffer = (MVMArray *)wi->buf_data; - output = (char *)(buffer->body.slots.i8 + buffer->body.start); + buffer = (MVMArray *)wi->buf_data; + output = (char *)(buffer->body.slots.i8 + buffer->body.start); output_size = (int)buffer->body.elems; /* Create and initialize write request. */ - wi->req = MVM_malloc(sizeof(uv_write_t)); - wi->buf = uv_buf_init(output, output_size); - wi->req->data = data; + wi->req = MVM_malloc(sizeof(uv_write_t)); + wi->req->data = data; + wi->buf = uv_buf_init(output, output_size); - if ((r = uv_write(wi->req, handle_data->handle, &(wi->buf), 1, on_write)) < 0) { - /* Error; need to notify. */ - MVMROOT(tc, async_task, { - MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); - MVMAsyncTask *t = (MVMAsyncTask *)async_task; - MVM_repr_push_o(tc, arr, t->body.schedulee); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); - MVMROOT(tc, arr, { - MVMString *msg_str = MVM_string_ascii_decode_nt(tc, - tc->instance->VMString, uv_strerror(r)); - MVMObject *msg_box = MVM_repr_box_str(tc, - tc->instance->boot_types.BOOTStr, msg_str); - MVM_repr_push_o(tc, arr, msg_box); - }); - MVM_repr_push_o(tc, t->body.queue, arr); - }); + /* Ensure not closed. */ + if (handle_data->handle == NULL) + goto closed; + + /* Get our handle. */ + handle = (uv_handle_t *)handle_data->handle; + + /* Ensure not closed. */ + if (uv_is_closing(handle)) + goto closed; + + /* Do our write. */ + if ((wi->error = uv_write(wi->req, handle_data->handle, &(wi->buf), 1, on_write)) == 0) + /* Success; finish up in on_write. */ + return; + + /* Error; need to notify. */ + MVMROOT3(tc, t, wi->handle, wi->buf_data, { + arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); + }); + MVM_repr_push_o(tc, arr, t->body.schedulee); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); + MVMROOT4(tc, t, arr, wi->handle, wi->buf_data, { + MVMString *msg_str = MVM_string_ascii_decode_nt(tc, + tc->instance->VMString, uv_strerror(wi->error)); + MVMObject *msg_box = MVM_repr_box_str(tc, + tc->instance->boot_types.BOOTStr, msg_str); + MVM_repr_push_o(tc, arr, msg_box); + }); + MVM_repr_push_o(tc, t->body.queue, arr); - /* Cleanup handle. */ - MVM_free(wi->req); - wi->req = NULL; - MVM_io_eventloop_remove_active_work(tc, &(wi->work_idx)); + /* Clean up our handle. */ + if (handle != NULL && !uv_is_closing(handle)) { + handle_data->handle = NULL; + uv_close(handle, free_on_close_cb); } + + MVM_io_eventloop_remove_active_work(tc, &(wi->work_idx)); + return; + +closed: + /* Handle closed; need to notify. */ + MVMROOT3(tc, t, wi->handle, wi->buf_data, { + arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); + }); + MVM_repr_push_o(tc, arr, t->body.schedulee); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); + MVMROOT4(tc, t, arr, wi->handle, wi->buf_data, { + MVMString *msg_str = MVM_string_ascii_decode_nt(tc, + tc->instance->VMString, "Cannot write to a closed socket"); + MVMObject *msg_box = MVM_repr_box_str(tc, + tc->instance->boot_types.BOOTStr, msg_str); + MVM_repr_push_o(tc, arr, msg_box); + }); + MVM_repr_push_o(tc, t->body.queue, arr); + MVM_io_eventloop_remove_active_work(tc, &(wi->work_idx)); + return; } /* Marks objects for a write task. */ @@ -335,8 +383,12 @@ static void write_gc_mark(MVMThreadContext *tc, void *data, MVMGCWorklist *workl /* Frees info for a write task. */ static void write_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) { - if (data) + if (data != NULL) { + WriteInfo *wi = (WriteInfo *)data; + if (wi->req != NULL) + MVM_free_null(wi->req); MVM_free(data); + } } /* Operations table for async write task. */ @@ -367,19 +419,20 @@ static MVMAsyncTask * write_bytes(MVMThreadContext *tc, MVMOSHandle *h, MVMObjec MVM_exception_throw_adhoc(tc, "asyncwritebytes requires a native array of uint8 or int8"); /* Create async task handle. */ - MVMROOT4(tc, queue, schedulee, h, buffer, { + MVMROOT5(tc, h, queue, schedulee, buffer, async_type, { task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type); }); + + wi = MVM_calloc(1, sizeof(WriteInfo)); MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue); MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee); - task->body.ops = &write_op_table; - wi = MVM_calloc(1, sizeof(WriteInfo)); MVM_ASSIGN_REF(tc, &(task->common.header), wi->handle, h); MVM_ASSIGN_REF(tc, &(task->common.header), wi->buf_data, buffer); task->body.data = wi; + task->body.ops = &write_op_table; /* Hand the task off to the event loop. */ - MVMROOT(tc, task, { + MVMROOT6(tc, h, queue, schedulee, buffer, async_type, task, { MVM_io_eventloop_queue_work(tc, (MVMObject *)task); }); @@ -393,10 +446,10 @@ typedef struct { /* Does an asynchronous close (since it must run on the event loop). */ static void close_perform(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) { - CloseInfo *ci = (CloseInfo *)data; + CloseInfo *ci = (CloseInfo *)data; MVMIOAsyncSocketData *handle_data = (MVMIOAsyncSocketData *)ci->handle->body.data; - uv_handle_t *handle = (uv_handle_t *)handle_data->handle; - if (handle && !uv_is_closing(handle)) { + uv_handle_t *handle = (uv_handle_t *)handle_data->handle; + if (handle != NULL && !uv_is_closing(handle)) { handle_data->handle = NULL; uv_close(handle, free_on_close_cb); } @@ -410,7 +463,7 @@ static void close_gc_mark(MVMThreadContext *tc, void *data, MVMGCWorklist *workl /* Frees info for a close task. */ static void close_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) { - if (data) + if (data != NULL) MVM_free(data); } @@ -425,17 +478,18 @@ static const MVMAsyncTaskOps close_op_table = { static MVMint64 close_socket(MVMThreadContext *tc, MVMOSHandle *h) { MVMAsyncTask *task; - CloseInfo *ci; + CloseInfo *ci; MVMROOT(tc, h, { - task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, - tc->instance->boot_types.BOOTAsync); + task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTAsync); }); - task->body.ops = &close_op_table; - ci = MVM_calloc(1, sizeof(CloseInfo)); + ci = MVM_calloc(1, sizeof(CloseInfo)); MVM_ASSIGN_REF(tc, &(task->common.header), ci->handle, h); task->body.data = ci; - MVM_io_eventloop_queue_work(tc, (MVMObject *)task); + task->body.ops = &close_op_table; + MVMROOT2(tc, h, task, { + MVM_io_eventloop_queue_work(tc, (MVMObject *)task); + }); return 0; } @@ -449,8 +503,8 @@ static MVMint64 socket_is_tty(MVMThreadContext *tc, MVMOSHandle *h) { static MVMint64 socket_handle(MVMThreadContext *tc, MVMOSHandle *h) { MVMIOAsyncSocketData *data = (MVMIOAsyncSocketData *)h->body.data; uv_handle_t *handle = (uv_handle_t *)data->handle; - int fd; - uv_os_fd_t fh; + int fd; + uv_os_fd_t fh; uv_fileno(handle, &fh); fd = uv_open_osfhandle(fh); @@ -480,61 +534,80 @@ static const MVMIOOps op_table = { NULL }; +/* Note: every MVMObject from the function calling this that needs to be rooted + * before allocating the host and port already is. */ static void push_name_and_port(MVMThreadContext *tc, struct sockaddr_storage *name, MVMObject *arr) { - char addrstr[INET6_ADDRSTRLEN + 1]; + char addrstr[INET6_ADDRSTRLEN + 1]; /* XXX windows support kludge. 64 bit is much too big, but we'll * get the proper data from the struct anyway, however windows * decides to declare it. */ MVMuint64 port; - MVMObject *host_o; - MVMObject *port_o; + + if (name == NULL) + goto error; + switch (name->ss_family) { case AF_INET6: { - uv_ip6_name((struct sockaddr_in6*)name, addrstr, INET6_ADDRSTRLEN + 1); - port = ntohs(((struct sockaddr_in6*)name)->sin6_port); + struct sockaddr_in6 *addr = (struct sockaddr_in6 *)name; + uv_ip6_name(addr, addrstr, INET6_ADDRSTRLEN + 1); + port = ntohs(addr->sin6_port); break; } case AF_INET: { - uv_ip4_name((struct sockaddr_in*)name, addrstr, INET6_ADDRSTRLEN + 1); - port = ntohs(((struct sockaddr_in*)name)->sin_port); + struct sockaddr_in *addr = (struct sockaddr_in *)name; + uv_ip4_name(addr, addrstr, INET6_ADDRSTRLEN + 1); + port = ntohs(addr->sin_port); break; } default: - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); - return; - break; + goto error; } + MVMROOT(tc, arr, { - port_o = MVM_repr_box_int(tc, tc->instance->boot_types.BOOTInt, port); - MVMROOT(tc, port_o, { - host_o = (MVMObject *)MVM_repr_box_str(tc, tc->instance->boot_types.BOOTStr, - MVM_string_ascii_decode_nt(tc, tc->instance->VMString, addrstr)); + MVMObject *host_o = (MVMObject *)MVM_repr_box_str(tc, tc->instance->boot_types.BOOTStr, + MVM_string_ascii_decode_nt(tc, tc->instance->VMString, addrstr)); + MVMObject *port_o; + MVM_repr_push_o(tc, arr, host_o); + MVMROOT(tc, host_o, { + port_o = MVM_repr_box_int(tc, tc->instance->boot_types.BOOTInt, port); }); + MVM_repr_push_o(tc, arr, port_o); }); - MVM_repr_push_o(tc, arr, host_o); - MVM_repr_push_o(tc, arr, port_o); + return; + +error: + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); + return; } /* Info we convey about a connection attempt task. */ typedef struct { - struct sockaddr *dest; - uv_tcp_t *socket; - uv_connect_t *connect; MVMThreadContext *tc; + uv_loop_t *loop; + MVMObject *async_task; int work_idx; + struct addrinfo *records; + struct addrinfo *cur_record; + uv_tcp_t *socket; + uv_connect_t *connect; + int error; } ConnectInfo; /* When a connection takes place, need to send result. */ static void on_connect(uv_connect_t* req, int status) { - ConnectInfo *ci = (ConnectInfo *)req->data; - MVMThreadContext *tc = ci->tc; - MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); - MVMAsyncTask *t = MVM_io_eventloop_get_active_work(tc, ci->work_idx); + ConnectInfo *ci = (ConnectInfo *)req->data; + MVMThreadContext *tc = ci->tc; + MVMAsyncTask *t = MVM_io_eventloop_get_active_work(tc, ci->work_idx); + MVMObject *arr; + + MVMROOT(tc, t, { + arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); + }); MVM_repr_push_o(tc, arr, t->body.schedulee); - if (status >= 0) { + if (status == 0) { /* Allocate and set up handle. */ - MVMROOT2(tc, arr, t, { + MVMROOT2(tc, t, arr, { MVMOSHandle *result = (MVMOSHandle *)MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTIO); MVMIOAsyncSocketData *data = MVM_calloc(1, sizeof(MVMIOAsyncSocketData)); data->handle = (uv_stream_t *)ci->socket; @@ -542,87 +615,110 @@ static void on_connect(uv_connect_t* req, int status) { result->body.data = data; MVM_repr_push_o(tc, arr, (MVMObject *)result); MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); + { - struct sockaddr_storage sockaddr; - int name_len = sizeof(struct sockaddr_storage); + struct sockaddr_storage name; + int name_len = sizeof(struct sockaddr_storage); - uv_tcp_getpeername(ci->socket, (struct sockaddr *)&sockaddr, &name_len); - push_name_and_port(tc, &sockaddr, arr); + uv_tcp_getpeername(ci->socket, (struct sockaddr *)&name, &name_len); + push_name_and_port(tc, &name, arr); - uv_tcp_getsockname(ci->socket, (struct sockaddr *)&sockaddr, &name_len); - push_name_and_port(tc, &sockaddr, arr); + uv_tcp_getsockname(ci->socket, (struct sockaddr *)&name, &name_len); + push_name_and_port(tc, &name, arr); } }); } else { + /* Error; need to notify. */ MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO); - MVMROOT2(tc, arr, t, { + MVMROOT2(tc, t, arr, { MVMString *msg_str = MVM_string_ascii_decode_nt(tc, tc->instance->VMString, uv_strerror(status)); MVMObject *msg_box = MVM_repr_box_str(tc, tc->instance->boot_types.BOOTStr, msg_str); MVM_repr_push_o(tc, arr, msg_box); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); }); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); } MVM_repr_push_o(tc, t->body.queue, arr); - MVM_free(req); + MVM_io_eventloop_remove_active_work(tc, &(ci->work_idx)); +} + +/* Does the actual work of making the connection. */ +static void do_connect_setup(uv_handle_t *handle) { + ConnectInfo *ci = (ConnectInfo *)handle->data; + MVMThreadContext *tc = ci->tc; + MVMAsyncTask *t = (MVMAsyncTask *)ci->async_task; + MVMObject *arr; + + for (; ci->cur_record != NULL; ci->cur_record = ci->cur_record->ai_next) { + if ((ci->error = uv_tcp_init(ci->loop, ci->socket)) != 0) + continue; + + if ((ci->error = uv_tcp_connect(ci->connect, ci->socket, ci->cur_record->ai_addr, on_connect)) != 0) + /* Error; try again with the next address, if any, before throwing. */ + uv_close(handle, do_connect_setup); + + /* If we succeeded, finish up in on_connect. */ + return; + } + + /* Error; no addresses could be used, so we need to notify. */ + MVMROOT(tc, t, { + arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); + }); + MVM_repr_push_o(tc, arr, t->body.schedulee); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO); + MVMROOT2(tc, t, arr, { + MVMString *msg_str = MVM_string_ascii_decode_nt(tc, + tc->instance->VMString, uv_strerror(ci->error)); + MVMObject *msg_box = MVM_repr_box_str(tc, + tc->instance->boot_types.BOOTStr, msg_str); + MVM_repr_push_o(tc, arr, msg_box); + }); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); + MVM_repr_push_o(tc, t->body.queue, arr); + + /* Clean up handles. */ + if (handle != NULL && !uv_is_closing(handle)) { + ci->socket = NULL; + uv_close(handle, free_on_close_cb); + } + MVM_free_null(ci->connect); + MVM_io_eventloop_remove_active_work(tc, &(ci->work_idx)); } /* Initilalize the connection on the event loop. */ static void connect_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) { - int r; - /* Add to work in progress. */ ConnectInfo *ci = (ConnectInfo *)data; - ci->tc = tc; - ci->work_idx = MVM_io_eventloop_add_active_work(tc, async_task); + ci->tc = tc; + ci->loop = loop; + ci->async_task = async_task; + ci->work_idx = MVM_io_eventloop_add_active_work(tc, async_task); /* Create and initialize socket and connection. */ ci->socket = MVM_malloc(sizeof(uv_tcp_t)); + ci->socket->data = data; ci->connect = MVM_malloc(sizeof(uv_connect_t)); ci->connect->data = data; - if ((r = uv_tcp_init(loop, ci->socket)) < 0 || - (r = uv_tcp_connect(ci->connect, ci->socket, ci->dest, on_connect)) < 0) { - /* Error; need to notify. */ - MVMROOT(tc, async_task, { - MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); - MVMAsyncTask *t = (MVMAsyncTask *)async_task; - MVM_repr_push_o(tc, arr, t->body.schedulee); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO); - MVMROOT(tc, arr, { - MVMString *msg_str = MVM_string_ascii_decode_nt(tc, - tc->instance->VMString, uv_strerror(r)); - MVMObject *msg_box = MVM_repr_box_str(tc, - tc->instance->boot_types.BOOTStr, msg_str); - MVM_repr_push_o(tc, arr, msg_box); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); - }); - MVM_repr_push_o(tc, t->body.queue, arr); - }); - /* Cleanup handles. */ - MVM_free(ci->connect); - ci->connect = NULL; - uv_close((uv_handle_t *)ci->socket, free_on_close_cb); - ci->socket = NULL; - MVM_io_eventloop_remove_active_work(tc, &(ci->work_idx)); - } + do_connect_setup((uv_handle_t *)ci->socket); } /* Frees info for a connection task. */ static void connect_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) { if (data) { ConnectInfo *ci = (ConnectInfo *)data; - if (ci->dest) - MVM_free(ci->dest); + if (ci->records != NULL) + freeaddrinfo(ci->records); MVM_free(ci); } } @@ -640,9 +736,9 @@ static const MVMAsyncTaskOps connect_op_table = { MVMObject * MVM_io_socket_connect_async(MVMThreadContext *tc, MVMObject *queue, MVMObject *schedulee, MVMString *host, MVMint64 port, MVMObject *async_type) { - MVMAsyncTask *task; - ConnectInfo *ci; - struct sockaddr *dest; + MVMAsyncTask *task; + struct addrinfo *records; + ConnectInfo *ci; /* Validate REPRs. */ if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue) @@ -652,24 +748,23 @@ MVMObject * MVM_io_socket_connect_async(MVMThreadContext *tc, MVMObject *queue, MVM_exception_throw_adhoc(tc, "asyncconnect result type must have REPR AsyncTask"); - /* Resolve hostname. (Could be done asynchronously too.) */ - MVMROOT3(tc, queue, schedulee, async_type, { - dest = MVM_io_resolve_host_name(tc, host, port, SOCKET_FAMILY_UNSPEC); - }); - /* Create async task handle. */ - MVMROOT2(tc, queue, schedulee, { - task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type); + MVMROOT4(tc, queue, schedulee, host, async_type, { + /* Resolve hostname. (Could be done asynchronously too.) */ + records = MVM_io_resolve_host_name(tc, host, port, SOCKET_FAMILY_UNSPEC, SOCKET_TYPE_STREAM, SOCKET_PROTOCOL_TCP, 0); + /* Create async task handle. */ + task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type); }); - MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue); - MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee); - task->body.ops = &connect_op_table; + + MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue); MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee); ci = MVM_calloc(1, sizeof(ConnectInfo)); - ci->dest = dest; + ci->records = records; + ci->cur_record = records; task->body.data = ci; + task->body.ops = &connect_op_table; /* Hand the task off to the event loop. */ - MVMROOT(tc, task, { + MVMROOT5(tc, queue, schedulee, host, async_type, task, { MVM_io_eventloop_queue_work(tc, (MVMObject *)task); }); @@ -678,31 +773,37 @@ MVMObject * MVM_io_socket_connect_async(MVMThreadContext *tc, MVMObject *queue, /* Info we convey about a socket listen task. */ typedef struct { - struct sockaddr *dest; - uv_tcp_t *socket; MVMThreadContext *tc; + uv_loop_t *loop; + MVMObject *async_task; int work_idx; int backlog; + struct addrinfo *records; + struct addrinfo *cur_record; + uv_tcp_t *socket; + int error; } ListenInfo; - /* Handles an incoming connection. */ static void on_connection(uv_stream_t *server, int status) { ListenInfo *li = (ListenInfo *)server->data; MVMThreadContext *tc = li->tc; - MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); MVMAsyncTask *t = MVM_io_eventloop_get_active_work(tc, li->work_idx); - + MVMObject *arr; uv_tcp_t *client = MVM_malloc(sizeof(uv_tcp_t)); int r; - uv_tcp_init(server->loop, client); + MVMROOT2(tc, t, li->async_task, { + arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); + }); MVM_repr_push_o(tc, arr, t->body.schedulee); - if ((r = uv_accept(server, (uv_stream_t *)client)) == 0) { + + if ((r = uv_tcp_init(server->loop, client)) == 0 && + (r = uv_accept(server, (uv_stream_t *)client)) == 0) { /* Allocate and set up handle. */ - MVMROOT2(tc, arr, t, { - struct sockaddr_storage sockaddr; - int name_len = sizeof(struct sockaddr_storage); + MVMROOT3(tc, t, arr, li->async_task, { + struct sockaddr_storage name; + int name_len = sizeof(struct sockaddr_storage); { MVMOSHandle *result = (MVMOSHandle *)MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTIO); @@ -714,8 +815,8 @@ static void on_connection(uv_stream_t *server, int status) { MVM_repr_push_o(tc, arr, (MVMObject *)result); MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); - uv_tcp_getpeername(client, (struct sockaddr *)&sockaddr, &name_len); - push_name_and_port(tc, &sockaddr, arr); + uv_tcp_getpeername(client, (struct sockaddr *)&name, &name_len); + push_name_and_port(tc, &name, arr); } { @@ -727,101 +828,122 @@ static void on_connection(uv_stream_t *server, int status) { MVM_repr_push_o(tc, arr, (MVMObject *)result); - uv_tcp_getsockname(client, (struct sockaddr *)&sockaddr, &name_len); - push_name_and_port(tc, &sockaddr, arr); + uv_tcp_getsockname(client, (struct sockaddr *)&name, &name_len); + push_name_and_port(tc, &name, arr); } }); } else { - uv_close((uv_handle_t*)client, NULL); - MVM_free(client); + /* Error; need to notify. */ MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO); - MVMROOT2(tc, arr, t, { + MVMROOT3(tc, t, arr, li->async_task, { MVMString *msg_str = MVM_string_ascii_decode_nt(tc, tc->instance->VMString, uv_strerror(r)); MVMObject *msg_box = MVM_repr_box_str(tc, tc->instance->boot_types.BOOTStr, msg_str); MVM_repr_push_o(tc, arr, msg_box); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); }); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); + + if (client != NULL && !uv_is_closing((uv_handle_t *)client)) + uv_close((uv_handle_t*)client, free_on_close_cb); } + MVM_repr_push_o(tc, t->body.queue, arr); } -/* Sets up a socket listener. */ -static void listen_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) { - int r; - - /* Add to work in progress. */ - ListenInfo *li = (ListenInfo *)data; - li->tc = tc; - li->work_idx = MVM_io_eventloop_add_active_work(tc, async_task); - - /* Create and initialize socket and connection, and start listening. */ - li->socket = MVM_malloc(sizeof(uv_tcp_t)); - li->socket->data = data; - if ((r = uv_tcp_init(loop, li->socket)) < 0 || - (r = uv_tcp_bind(li->socket, li->dest, 0)) < 0 || - (r = uv_listen((uv_stream_t *)li->socket, li->backlog, on_connection))) { - /* Error; need to notify. */ - MVMROOT(tc, async_task, { - MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); - MVM_repr_push_o(tc, arr, ((MVMAsyncTask *)async_task)->body.schedulee); +static void do_listen_setup(uv_handle_t *handle) { + ListenInfo *li = (ListenInfo *)handle->data; + MVMThreadContext *tc = li->tc; + MVMAsyncTask *t = (MVMAsyncTask *)li->async_task; + MVMObject *arr; + + for (; li->cur_record != NULL; li->cur_record = li->cur_record->ai_next) { + if ((li->error = uv_tcp_init(li->loop, li->socket)) != 0) + continue; + + if ((li->error = uv_tcp_bind(li->socket, li->cur_record->ai_addr, 0)) != 0 || + (li->error = uv_listen((uv_stream_t *)li->socket, li->backlog, on_connection) != 0)) { + /* Error; try the rest of the addresses, if any, before throwing. */ + li->cur_record = li->cur_record->ai_next; + uv_close(handle, do_listen_setup); + } else { + /* Success; allocate our handle. */ + MVMROOT(tc, t, { + arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); + }); + MVM_repr_push_o(tc, arr, t->body.schedulee); MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO); - MVMROOT(tc, arr, { - MVMString *msg_str = MVM_string_ascii_decode_nt(tc, - tc->instance->VMString, uv_strerror(r)); - MVMObject *msg_box = MVM_repr_box_str(tc, - tc->instance->boot_types.BOOTStr, msg_str); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); + MVMROOT2(tc, t, arr, { + MVMOSHandle *result = (MVMOSHandle *)MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTIO); + MVMIOAsyncSocketData *data = MVM_calloc(1, sizeof(MVMIOAsyncSocketData)); + data->handle = (uv_stream_t *)li->socket; + result->body.ops = &op_table; + result->body.data = data; + MVM_repr_push_o(tc, arr, (MVMObject *)result); - MVM_repr_push_o(tc, arr, msg_box); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); + { + struct sockaddr_storage name; + int name_len = sizeof(struct sockaddr_storage); + uv_tcp_getsockname(li->socket, (struct sockaddr *)&name, &name_len); + push_name_and_port(tc, &name, arr); + } }); - MVM_repr_push_o(tc, ((MVMAsyncTask *)async_task)->body.queue, arr); - }); - uv_close((uv_handle_t *)li->socket, free_on_close_cb); - li->socket = NULL; - MVM_io_eventloop_remove_active_work(tc, &(li->work_idx)); + MVM_repr_push_o(tc, t->body.queue, arr); + } + return; } - { - MVMObject *arr; - struct sockaddr_storage sockaddr; - int name_len = sizeof(struct sockaddr_storage); - - MVMROOT(tc, async_task, { - arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); + /* Error; no addresses could be used, so we need to notify. */ + MVMROOT(tc, t, { + arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); + }); + MVM_repr_push_o(tc, arr, t->body.schedulee); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO); + MVMROOT2(tc, t, arr, { + MVMString *msg_str = MVM_string_ascii_decode_nt(tc, + tc->instance->VMString, uv_strerror(li->error)); + MVMObject *msg_box = MVM_repr_box_str(tc, + tc->instance->boot_types.BOOTStr, msg_str); + MVM_repr_push_o(tc, arr, msg_box); + }); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); + MVM_repr_push_o(tc, t->body.queue, arr); - MVM_repr_push_o(tc, arr, ((MVMAsyncTask *)async_task)->body.schedulee); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); + if (handle != NULL && !uv_is_closing(handle)) { + li->socket = NULL; + uv_close(handle, free_on_close_cb); + } - MVMROOT(tc, arr, { - MVMOSHandle *result = (MVMOSHandle *)MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTIO); - MVMIOAsyncSocketData *data = MVM_calloc(1, sizeof(MVMIOAsyncSocketData)); - data->handle = (uv_stream_t *)li->socket; - result->body.ops = &op_table; - result->body.data = data; + MVM_io_eventloop_remove_active_work(tc, &(li->work_idx)); + return; +} +/* Sets up a socket listener. */ +static void listen_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) { + /* Add to work in progress. */ + ListenInfo *li = (ListenInfo *)data; + li->tc = tc; + li->loop = loop; + li->async_task = async_task; + li->work_idx = MVM_io_eventloop_add_active_work(tc, async_task); - MVM_repr_push_o(tc, arr, (MVMObject *)result); + /* Create and initialize socket and connection, and start listening. */ + li->socket = MVM_malloc(sizeof(uv_tcp_t)); + li->socket->data = data; - uv_tcp_getsockname(li->socket, (struct sockaddr *)&sockaddr, &name_len); - push_name_and_port(tc, &sockaddr, arr); - }); - MVM_repr_push_o(tc, ((MVMAsyncTask *)async_task)->body.queue, arr); - }); - } + do_listen_setup((uv_handle_t *)li->socket); } /* Stops listening. */ @@ -834,18 +956,24 @@ static void on_listen_cancelled(uv_handle_t *handle) { } static void listen_cancel(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) { ListenInfo *li = (ListenInfo *)data; - if (li->socket) { + if (li->socket != NULL) { uv_close((uv_handle_t *)li->socket, on_listen_cancelled); li->socket = NULL; } } +/* Marks objects for a listen task. */ +static void listen_gc_mark(MVMThreadContext *tc, void *data, MVMGCWorklist *worklist) { + ListenInfo *li = (ListenInfo *)data; + MVM_gc_worklist_add(tc, worklist, &li->async_task); +} + /* Frees info for a listen task. */ static void listen_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) { - if (data) { + if (data != NULL) { ListenInfo *li = (ListenInfo *)data; - if (li->dest) - MVM_free(li->dest); + if (li->records != NULL) + freeaddrinfo(li->records); MVM_free(li); } } @@ -855,7 +983,7 @@ static const MVMAsyncTaskOps listen_op_table = { listen_setup, NULL, listen_cancel, - NULL, + listen_gc_mark, listen_gc_free }; @@ -863,9 +991,9 @@ static const MVMAsyncTaskOps listen_op_table = { MVMObject * MVM_io_socket_listen_async(MVMThreadContext *tc, MVMObject *queue, MVMObject *schedulee, MVMString *host, MVMint64 port, MVMint32 backlog, MVMObject *async_type) { - MVMAsyncTask *task; - ListenInfo *li; - struct sockaddr *dest; + MVMAsyncTask *task; + ListenInfo *li; + struct addrinfo *records; /* Validate REPRs. */ if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue) @@ -875,25 +1003,24 @@ MVMObject * MVM_io_socket_listen_async(MVMThreadContext *tc, MVMObject *queue, MVM_exception_throw_adhoc(tc, "asynclisten result type must have REPR AsyncTask"); - /* Resolve hostname. (Could be done asynchronously too.) */ - MVMROOT3(tc, queue, schedulee, async_type, { - dest = MVM_io_resolve_host_name(tc, host, port, SOCKET_FAMILY_UNSPEC); + MVMROOT4(tc, queue, schedulee, host, async_type, { + /* Resolve hostname. (Could be done asynchronously too.) */ + records = MVM_io_resolve_host_name(tc, host, port, SOCKET_FAMILY_UNSPEC, SOCKET_TYPE_STREAM, SOCKET_PROTOCOL_TCP, 1); + /* Create async task handle. */ + task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type); }); - /* Create async task handle. */ - MVMROOT2(tc, queue, schedulee, { - task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type); - }); MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue); MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee); - task->body.ops = &listen_op_table; li = MVM_calloc(1, sizeof(ListenInfo)); - li->dest = dest; li->backlog = backlog; + li->records = records; + li->cur_record = records; task->body.data = li; + task->body.ops = &listen_op_table; /* Hand the task off to the event loop. */ - MVMROOT(tc, task, { + MVMROOT5(tc, queue, schedulee, host, async_type, task, { MVM_io_eventloop_queue_work(tc, (MVMObject *)task); }); diff --git a/src/io/asyncsocketudp.c b/src/io/asyncsocketudp.c index 73e48fd71d..877a8ca63b 100644 --- a/src/io/asyncsocketudp.c +++ b/src/io/asyncsocketudp.c @@ -6,16 +6,16 @@ /* Data that we keep for an asynchronous UDP socket handle. */ typedef struct { /* The libuv handle to the socket. */ - uv_udp_t *handle; + uv_udp_t *handle; } MVMIOAsyncUDPSocketData; /* Info we convey about a read task. */ typedef struct { + MVMThreadContext *tc; + int work_idx; MVMOSHandle *handle; MVMObject *buf_type; int seq_number; - MVMThreadContext *tc; - int work_idx; } ReadInfo; /* Allocates a buffer of the suggested size. */ @@ -32,128 +32,140 @@ static void free_on_close_cb(uv_handle_t *handle) { /* XXX this is duplicated from asyncsocket.c; put it in some shared file */ static void push_name_and_port(MVMThreadContext *tc, struct sockaddr_storage *name, MVMObject *arr) { - char addrstr[INET6_ADDRSTRLEN + 1]; + char addrstr[INET6_ADDRSTRLEN + 1]; /* XXX windows support kludge. 64 bit is much too big, but we'll * get the proper data from the struct anyway, however windows * decides to declare it. */ MVMuint64 port; - MVMObject *host_o; - MVMObject *port_o; - if (name) { - switch (name->ss_family) { - case AF_INET6: { - uv_ip6_name((struct sockaddr_in6*)name, addrstr, INET6_ADDRSTRLEN + 1); - port = ntohs(((struct sockaddr_in6*)name)->sin6_port); - break; - } - case AF_INET: { - uv_ip4_name((struct sockaddr_in*)name, addrstr, INET6_ADDRSTRLEN + 1); - port = ntohs(((struct sockaddr_in*)name)->sin_port); - break; - } - default: - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); - return; - break; + + if (name == NULL) + goto error; + + switch (name->ss_family) { + case AF_INET6: { + struct sockaddr_in6 *addr = (struct sockaddr_in6 *)name; + uv_ip6_name(addr, addrstr, INET6_ADDRSTRLEN + 1); + port = ntohs(addr->sin6_port); + break; } - MVMROOT(tc, arr, { + case AF_INET: { + struct sockaddr_in *addr = (struct sockaddr_in *)name; + uv_ip4_name(addr, addrstr, INET6_ADDRSTRLEN + 1); + port = ntohs(addr->sin_port); + break; + } + default: + goto error; + } + + MVMROOT(tc, arr, { + MVMObject *host_o = (MVMObject *)MVM_repr_box_str(tc, tc->instance->boot_types.BOOTStr, + MVM_string_ascii_decode_nt(tc, tc->instance->VMString, addrstr)); + MVMObject *port_o; + MVM_repr_push_o(tc, arr, host_o); + MVMROOT(tc, host_o, { port_o = MVM_repr_box_int(tc, tc->instance->boot_types.BOOTInt, port); - MVMROOT(tc, port_o, { - host_o = (MVMObject *)MVM_repr_box_str(tc, tc->instance->boot_types.BOOTStr, - MVM_string_ascii_decode_nt(tc, tc->instance->VMString, addrstr)); - }); }); - } else { - host_o = tc->instance->boot_types.BOOTStr; - port_o = tc->instance->boot_types.BOOTInt; - } - MVM_repr_push_o(tc, arr, host_o); - MVM_repr_push_o(tc, arr, port_o); + MVM_repr_push_o(tc, arr, port_o); + }); + return; + +error: + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); + return; } /* Read handler. */ static void on_read(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) { - ReadInfo *ri = (ReadInfo *)handle->data; - MVMThreadContext *tc = ri->tc; - MVMObject *arr; + ReadInfo *ri = (ReadInfo *)handle->data; + MVMThreadContext *tc = ri->tc; MVMAsyncTask *t; + MVMObject *arr; /* libuv will call on_read once after all datagram read operations * to "give us back a buffer". in that case, nread and addr are NULL. * This is an artifact of the underlying implementation and we shouldn't * pass it through to the user. */ - if (nread == 0 && addr == NULL) return; - arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); t = MVM_io_eventloop_get_active_work(tc, ri->work_idx); - + MVMROOT(tc, t, { + arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); + }); MVM_repr_push_o(tc, arr, t->body.schedulee); if (nread >= 0) { MVMROOT2(tc, t, arr, { - MVMArray *res_buf; - - /* Push the sequence number. */ + /* Success; start by pushing the sequence number, then produce a + * buffer an push that as well. */ MVMObject *seq_boxed = MVM_repr_box_int(tc, tc->instance->boot_types.BOOTInt, ri->seq_number++); - MVM_repr_push_o(tc, arr, seq_boxed); - - /* Produce a buffer and push it. */ - res_buf = (MVMArray *)MVM_repr_alloc_init(tc, ri->buf_type); + MVMArray *res_buf = (MVMArray *)MVM_repr_alloc_init(tc, ri->buf_type); res_buf->body.slots.i8 = (MVMint8 *)buf->base; res_buf->body.start = 0; res_buf->body.ssize = buf->len; res_buf->body.elems = nread; + MVM_repr_push_o(tc, arr, seq_boxed); MVM_repr_push_o(tc, arr, (MVMObject *)res_buf); + }); - /* next, no error. */ - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); + /* Next, no error... */ + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); - /* and finally, address and port */ + /* ...and finally, address and port. */ + MVMROOT2(tc, t, arr, { push_name_and_port(tc, (struct sockaddr_storage *)addr, arr); }); + + MVM_repr_push_o(tc, t->body.queue, arr); } - else if (nread == UV_EOF) { - MVMROOT2(tc, t, arr, { - MVMObject *final = MVM_repr_box_int(tc, - tc->instance->boot_types.BOOTInt, ri->seq_number); - MVM_repr_push_o(tc, arr, final); + else { + if (nread == UV_EOF) { + /* End of file; push bytes read. */ + MVMROOT2(tc, t, arr, { + MVMObject *final = MVM_repr_box_int(tc, + tc->instance->boot_types.BOOTInt, ri->seq_number); + MVM_repr_push_o(tc, arr, final); + }); MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); - }); - if (buf->base) - MVM_free(buf->base); - uv_udp_recv_stop(handle); - MVM_io_eventloop_remove_active_work(tc, &(ri->work_idx)); - } - else { - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); - MVMROOT2(tc, t, arr, { - MVMString *msg_str = MVM_string_ascii_decode_nt(tc, - tc->instance->VMString, uv_strerror(nread)); - MVMObject *msg_box = MVM_repr_box_str(tc, - tc->instance->boot_types.BOOTStr, msg_str); - MVM_repr_push_o(tc, arr, msg_box); + } + else { + /* Error; push the error message. */ + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); + MVMROOT2(tc, t, arr, { + MVMString *msg_str = MVM_string_ascii_decode_nt(tc, + tc->instance->VMString, uv_strerror(nread)); + MVMObject *msg_box = MVM_repr_box_str(tc, + tc->instance->boot_types.BOOTStr, msg_str); + MVM_repr_push_o(tc, arr, msg_box); + }); MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); - }); - if (buf->base) + } + + MVM_repr_push_o(tc, t->body.queue, arr); + + /* Clean up. */ + if (handle != NULL && !uv_is_closing((uv_handle_t *)handle)) + uv_udp_recv_stop(handle); + if (buf->base != NULL) MVM_free(buf->base); - uv_udp_recv_stop(handle); + MVM_io_eventloop_remove_active_work(tc, &(ri->work_idx)); } - MVM_repr_push_o(tc, t->body.queue, arr); } /* Does setup work for setting up asynchronous reads. */ static void read_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) { MVMIOAsyncUDPSocketData *handle_data; - int r; + MVMAsyncTask *t; + MVMObject *arr; + int r; /* Add to work in progress. */ ReadInfo *ri = (ReadInfo *)data; @@ -161,38 +173,40 @@ static void read_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_t ri->work_idx = MVM_io_eventloop_add_active_work(tc, async_task); /* Start reading the stream. */ - handle_data = (MVMIOAsyncUDPSocketData *)ri->handle->body.data; + handle_data = (MVMIOAsyncUDPSocketData *)ri->handle->body.data; handle_data->handle->data = data; - if ((r = uv_udp_recv_start(handle_data->handle, on_alloc, on_read)) < 0) { - /* Error; need to notify. */ - MVMROOT(tc, async_task, { - MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); - MVMAsyncTask *t = (MVMAsyncTask *)async_task; - MVM_repr_push_o(tc, arr, t->body.schedulee); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); - MVMROOT(tc, arr, { - MVMString *msg_str = MVM_string_ascii_decode_nt(tc, - tc->instance->VMString, uv_strerror(r)); - MVMObject *msg_box = MVM_repr_box_str(tc, - tc->instance->boot_types.BOOTStr, msg_str); - MVM_repr_push_o(tc, arr, msg_box); - }); - MVM_repr_push_o(tc, t->body.queue, arr); - }); - } + if ((r = uv_udp_recv_start(handle_data->handle, on_alloc, on_read)) == 0) + /* Success; finish up in on_read. */ + return; + + /* Error; need to notify. */ + t = (MVMAsyncTask *)async_task; + MVMROOT(tc, t, { + arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); + }); + MVM_repr_push_o(tc, arr, t->body.schedulee); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); + MVMROOT2(tc, t, arr, { + MVMString *msg_str = MVM_string_ascii_decode_nt(tc, + tc->instance->VMString, uv_strerror(r)); + MVMObject *msg_box = MVM_repr_box_str(tc, + tc->instance->boot_types.BOOTStr, msg_str); + MVM_repr_push_o(tc, arr, msg_box); + }); + MVM_repr_push_o(tc, t->body.queue, arr); } /* Marks objects for a read task. */ static void read_gc_mark(MVMThreadContext *tc, void *data, MVMGCWorklist *worklist) { ReadInfo *ri = (ReadInfo *)data; - MVM_gc_worklist_add(tc, worklist, &ri->buf_type); MVM_gc_worklist_add(tc, worklist, &ri->handle); + MVM_gc_worklist_add(tc, worklist, &ri->buf_type); } /* Frees info for a read task. */ static void read_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) { - if (data) + if (data != NULL) MVM_free(data); } @@ -208,7 +222,7 @@ static const MVMAsyncTaskOps read_op_table = { static MVMAsyncTask * read_bytes(MVMThreadContext *tc, MVMOSHandle *h, MVMObject *queue, MVMObject *schedulee, MVMObject *buf_type, MVMObject *async_type) { MVMAsyncTask *task; - ReadInfo *ri; + ReadInfo *ri; /* Validate REPRs. */ if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue) @@ -228,19 +242,19 @@ static MVMAsyncTask * read_bytes(MVMThreadContext *tc, MVMOSHandle *h, MVMObject } /* Create async task handle. */ - MVMROOT4(tc, queue, schedulee, h, buf_type, { + MVMROOT5(tc, h, queue, schedulee, buf_type, async_type, { task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type); }); MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue); MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee); - task->body.ops = &read_op_table; ri = MVM_calloc(1, sizeof(ReadInfo)); - MVM_ASSIGN_REF(tc, &(task->common.header), ri->buf_type, buf_type); MVM_ASSIGN_REF(tc, &(task->common.header), ri->handle, h); + MVM_ASSIGN_REF(tc, &(task->common.header), ri->buf_type, buf_type); task->body.data = ri; + task->body.ops = &read_op_table; - /* Hand the task off to the event loop. */ - MVMROOT(tc, task, { + /* Hand the task off to the event loop. */ + MVMROOT6(tc, h, queue, schedulee, buf_type, async_type, task, { MVM_io_eventloop_queue_work(tc, (MVMObject *)task); }); @@ -249,24 +263,31 @@ static MVMAsyncTask * read_bytes(MVMThreadContext *tc, MVMOSHandle *h, MVMObject /* Info we convey about a write task. */ typedef struct { + MVMThreadContext *tc; + MVMObject *async_task; + int work_idx; MVMOSHandle *handle; MVMObject *buf_data; + struct addrinfo *records; + struct addrinfo *cur_record; uv_udp_send_t *req; uv_buf_t buf; - MVMThreadContext *tc; - int work_idx; - struct sockaddr *dest_addr; + int error; } WriteInfo; /* Completion handler for an asynchronous write. */ static void on_write(uv_udp_send_t *req, int status) { - WriteInfo *wi = (WriteInfo *)req->data; - MVMThreadContext *tc = wi->tc; - MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); - MVMAsyncTask *t = MVM_io_eventloop_get_active_work(tc, wi->work_idx); + WriteInfo *wi = (WriteInfo *)req->data; + MVMThreadContext *tc = wi->tc; + MVMAsyncTask *t = MVM_io_eventloop_get_active_work(tc, wi->work_idx); + MVMObject *arr; + + MVMROOT3(tc, t, wi->handle, wi->buf_data, { + arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); + }); MVM_repr_push_o(tc, arr, t->body.schedulee); - if (status >= 0) { - MVMROOT2(tc, arr, t, { + if (status == 0) { + MVMROOT4(tc, t, arr, wi->handle, wi->buf_data, { MVMObject *bytes_box = MVM_repr_box_int(tc, tc->instance->boot_types.BOOTInt, wi->buf.len); @@ -276,7 +297,7 @@ static void on_write(uv_udp_send_t *req, int status) { } else { MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); - MVMROOT2(tc, arr, t, { + MVMROOT4(tc, t, arr, wi->handle, wi->buf_data, { MVMString *msg_str = MVM_string_ascii_decode_nt(tc, tc->instance->VMString, uv_strerror(status)); MVMObject *msg_box = MVM_repr_box_str(tc, @@ -285,58 +306,73 @@ static void on_write(uv_udp_send_t *req, int status) { }); } MVM_repr_push_o(tc, t->body.queue, arr); + + /* Clean up. */ MVM_free(wi->req); + MVM_io_eventloop_remove_active_work(tc, &(wi->work_idx)); } /* Does setup work for an asynchronous write. */ static void write_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) { - MVMIOAsyncUDPSocketData *handle_data; MVMArray *buffer; char *output; - int output_size, r; + int output_size; + MVMIOAsyncUDPSocketData *handle_data; + uv_handle_t *handle; + MVMAsyncTask *t; + MVMObject *arr; + int e; /* Add to work in progress. */ - WriteInfo *wi = (WriteInfo *)data; - wi->tc = tc; - wi->work_idx = MVM_io_eventloop_add_active_work(tc, async_task); + WriteInfo *wi = (WriteInfo *)data; + wi->tc = tc; + wi->async_task = async_task; + wi->work_idx = MVM_io_eventloop_add_active_work(tc, async_task); /* Extract buf data. */ - buffer = (MVMArray *)wi->buf_data; - output = (char *)(buffer->body.slots.i8 + buffer->body.start); + buffer = (MVMArray *)wi->buf_data; + output = (char *)(buffer->body.slots.i8 + buffer->body.start); output_size = (int)buffer->body.elems; /* Create and initialize write request. */ - wi->req = MVM_malloc(sizeof(uv_udp_send_t)); - wi->buf = uv_buf_init(output, output_size); - wi->req->data = data; - handle_data = (MVMIOAsyncUDPSocketData *)wi->handle->body.data; - - if (uv_is_closing((uv_handle_t *)handle_data->handle)) - MVM_exception_throw_adhoc(tc, "cannot write to a closed socket"); - - if ((r = uv_udp_send(wi->req, handle_data->handle, &(wi->buf), 1, wi->dest_addr, on_write)) < 0) { - /* Error; need to notify. */ - MVMROOT(tc, async_task, { - MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); - MVMAsyncTask *t = (MVMAsyncTask *)async_task; - MVM_repr_push_o(tc, arr, t->body.schedulee); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); - MVMROOT(tc, arr, { - MVMString *msg_str = MVM_string_ascii_decode_nt(tc, - tc->instance->VMString, uv_strerror(r)); - MVMObject *msg_box = MVM_repr_box_str(tc, - tc->instance->boot_types.BOOTStr, msg_str); - MVM_repr_push_o(tc, arr, msg_box); - }); - MVM_repr_push_o(tc, t->body.queue, arr); - }); + wi->req = MVM_malloc(sizeof(uv_udp_send_t)); + wi->req->data = wi; + wi->buf = uv_buf_init(output, output_size); + + handle_data = (MVMIOAsyncUDPSocketData *)wi->handle->body.data; + handle = (uv_handle_t *)handle_data->handle; + if (uv_is_closing(handle)) + MVM_exception_throw_adhoc(tc, "cannot send over a closed socket"); - /* Cleanup handle. */ - MVM_free(wi->req); - wi->req = NULL; - MVM_io_eventloop_remove_active_work(tc, &(wi->work_idx)); + for (; wi->cur_record != NULL; wi->cur_record = wi->cur_record->ai_next) { + if ((wi->error = uv_udp_send(wi->req, (uv_udp_t *)handle, &(wi->buf), 1, wi->cur_record->ai_addr, on_write)) == 0) + /* Success; finish up in on_write. */ + return; + + /* Error; try again with the next address, if any, before throwing. */ } + + /* Error; no addresses could be used, so we need to notify. */ + t = (MVMAsyncTask *)async_task; + MVMROOT(tc, t, { + arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); + }); + MVM_repr_push_o(tc, arr, t->body.schedulee); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); + MVMROOT2(tc, t, arr, { + MVMString *msg_str = MVM_string_ascii_decode_nt(tc, + tc->instance->VMString, uv_strerror(wi->error)); + MVMObject *msg_box = MVM_repr_box_str(tc, + tc->instance->boot_types.BOOTStr, msg_str); + MVM_repr_push_o(tc, arr, msg_box); + }); + MVM_repr_push_o(tc, t->body.queue, arr); + + /* Clean up handle. */ + MVM_free(wi->req); + + MVM_io_eventloop_remove_active_work(tc, &(wi->work_idx)); } /* Marks objects for a write task. */ @@ -348,10 +384,10 @@ static void write_gc_mark(MVMThreadContext *tc, void *data, MVMGCWorklist *workl /* Frees info for a write task. */ static void write_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) { - if (data) { + if (data != NULL) { WriteInfo *wi = (WriteInfo *)data; - if (wi->dest_addr) - MVM_free(wi->dest_addr); + if (wi->records != NULL) + freeaddrinfo(wi->records); MVM_free(data); } } @@ -370,7 +406,7 @@ static MVMAsyncTask * write_bytes_to(MVMThreadContext *tc, MVMOSHandle *h, MVMOb MVMString *host, MVMint64 port) { MVMAsyncTask *task; WriteInfo *wi; - struct sockaddr *dest_addr; + struct addrinfo *records; /* Validate REPRs. */ if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue) @@ -386,24 +422,25 @@ static MVMAsyncTask * write_bytes_to(MVMThreadContext *tc, MVMOSHandle *h, MVMOb MVM_exception_throw_adhoc(tc, "asyncwritebytesto requires a native array of uint8 or int8"); /* Resolve destination and create async task handle. */ - MVMROOT4(tc, queue, schedulee, h, buffer, { - MVMROOT(tc, async_type, { - dest_addr = MVM_io_resolve_host_name(tc, host, port, SOCKET_FAMILY_UNSPEC); - }); - task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type); + MVMROOT6(tc, h, queue, schedulee, buffer, async_type, host, { + records = MVM_io_resolve_host_name(tc, host, port, SOCKET_FAMILY_UNSPEC, SOCKET_TYPE_DGRAM, SOCKET_PROTOCOL_UDP, 0); + task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type); }); MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue); MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee); - task->body.ops = &write_op_table; wi = MVM_calloc(1, sizeof(WriteInfo)); MVM_ASSIGN_REF(tc, &(task->common.header), wi->handle, h); MVM_ASSIGN_REF(tc, &(task->common.header), wi->buf_data, buffer); - wi->dest_addr = dest_addr; + wi->records = records; + wi->cur_record = records; task->body.data = wi; + task->body.ops = &write_op_table; /* Hand the task off to the event loop. */ - MVMROOT(tc, task, { - MVM_io_eventloop_queue_work(tc, (MVMObject *)task); + MVMROOT6(tc, h, queue, schedulee, buffer, async_type, host, { + MVMROOT(tc, task, { + MVM_io_eventloop_queue_work(tc, (MVMObject *)task); + }); }); return task; @@ -413,7 +450,7 @@ static MVMAsyncTask * write_bytes_to(MVMThreadContext *tc, MVMOSHandle *h, MVMOb static void close_perform(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) { uv_handle_t *handle = (uv_handle_t *)data; - if (uv_is_closing(handle)) + if (handle == NULL || uv_is_closing(handle)) MVM_exception_throw_adhoc(tc, "cannot close a closed socket"); uv_close(handle, free_on_close_cb); @@ -430,16 +467,16 @@ static const MVMAsyncTaskOps close_op_table = { static MVMint64 close_socket(MVMThreadContext *tc, MVMOSHandle *h) { MVMIOAsyncUDPSocketData *data = (MVMIOAsyncUDPSocketData *)h->body.data; - MVMAsyncTask *task; - + MVMAsyncTask *task; MVMROOT(tc, h, { task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTAsync); }); task->body.ops = &close_op_table; task->body.data = data->handle; - MVM_io_eventloop_queue_work(tc, (MVMObject *)task); - + MVMROOT2(tc, h, task, { + MVM_io_eventloop_queue_work(tc, (MVMObject *)task); + }); return 0; } @@ -452,8 +489,8 @@ static MVMint64 socket_is_tty(MVMThreadContext *tc, MVMOSHandle *h) { static MVMint64 socket_handle(MVMThreadContext *tc, MVMOSHandle *h) { MVMIOAsyncUDPSocketData *data = (MVMIOAsyncUDPSocketData *)h->body.data; uv_handle_t *handle = (uv_handle_t *)data->handle; - int fd; - uv_os_fd_t fh; + int fd; + uv_os_fd_t fh; uv_fileno(handle, &fh); fd = uv_open_osfhandle(fh); @@ -485,68 +522,101 @@ static const MVMIOOps op_table = { /* Info we convey about a socket setup task. */ typedef struct { - struct sockaddr *bind_addr; + MVMThreadContext *tc; + uv_loop_t *loop; + MVMObject *async_task; + uv_udp_t *handle; + int bind; + struct addrinfo *records; + struct addrinfo *cur_record; MVMint64 flags; + int error; } SocketSetupInfo; -/* Initilalize the UDP socket on the event loop. */ -static void setup_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) { - /* Set up the UDP handle. */ - SocketSetupInfo *ssi = (SocketSetupInfo *)data; - uv_udp_t *udp_handle = MVM_malloc(sizeof(uv_udp_t)); - int r; - if ((r = uv_udp_init(loop, udp_handle)) >= 0) { - if (ssi->bind_addr) - r = uv_udp_bind(udp_handle, ssi->bind_addr, 0); - if (r >= 0 && (ssi->flags & 1)) - r = uv_udp_set_broadcast(udp_handle, 1); +/* Does the actual work of initializing the UDP socket on the event loop. */ +static void do_setup_setup(uv_handle_t *handle) { + SocketSetupInfo *ssi = (SocketSetupInfo *)handle->data; + MVMThreadContext *tc = ssi->tc; + MVMAsyncTask *t = (MVMAsyncTask *)ssi->async_task; + MVMObject *arr; + + if (ssi->bind) { + if (ssi->cur_record == NULL) { + /* Clean up. */ + handle->data = NULL; + MVM_free(handle); + } else { + /* Create and bind the socket. */ + ssi->error = uv_udp_init(ssi->loop, ssi->handle); + if (ssi->error == 0 && ssi->flags & 1) + ssi->error = uv_udp_set_broadcast(ssi->handle, 1); + if (ssi->error == 0) + ssi->error = uv_udp_bind(ssi->handle, ssi->cur_record->ai_addr, 0); + + if (ssi->error != 0) { + /* Error; try again with the next address, if any, before throwing. */ + ssi->cur_record = ssi->cur_record->ai_next; + uv_close(handle, do_setup_setup); + return; + } + } + } else { + /* Create the socket. */ + ssi->error = uv_udp_init(ssi->loop, ssi->handle); + if (ssi->error == 0 && ssi->flags & 1) + ssi->error = uv_udp_set_broadcast(ssi->handle, 1); } - if (r >= 0) { + MVMROOT(tc, t, { + arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); + }); + MVM_repr_push_o(tc, arr, t->body.schedulee); + if (ssi->error == 0) { /* UDP handle initialized; wrap it up in an I/O handle and send. */ - MVMAsyncTask *t = (MVMAsyncTask *)async_task; - MVMObject *arr; - MVMROOT(tc, t, { - arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); - MVM_repr_push_o(tc, arr, t->body.schedulee); - MVMROOT(tc, arr, { - MVMOSHandle *result = (MVMOSHandle *)MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTIO); - MVMIOAsyncUDPSocketData *data = MVM_calloc(1, sizeof(MVMIOAsyncUDPSocketData)); - data->handle = udp_handle; - result->body.ops = &op_table; - result->body.data = data; - MVM_repr_push_o(tc, arr, (MVMObject *)result); - }); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); + MVMROOT2(tc, t, arr, { + MVMOSHandle *result = (MVMOSHandle *)MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTIO); + MVMIOAsyncUDPSocketData *data = MVM_calloc(1, sizeof(MVMIOAsyncUDPSocketData)); + data->handle = ssi->handle; + result->body.ops = &op_table; + result->body.data = data; + MVM_repr_push_o(tc, arr, (MVMObject *)result); }); - MVM_repr_push_o(tc, t->body.queue, arr); + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); } else { - /* Something failed; need to notify. */ - MVMROOT(tc, async_task, { - MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); - MVMAsyncTask *t = (MVMAsyncTask *)async_task; - MVM_repr_push_o(tc, arr, t->body.schedulee); - MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO); - MVMROOT2(tc, arr, t, { - MVMString *msg_str = MVM_string_ascii_decode_nt(tc, - tc->instance->VMString, uv_strerror(r)); - MVMObject *msg_box = MVM_repr_box_str(tc, - tc->instance->boot_types.BOOTStr, msg_str); - MVM_repr_push_o(tc, arr, msg_box); - }); - MVM_repr_push_o(tc, t->body.queue, arr); - uv_close((uv_handle_t *)udp_handle, free_on_close_cb); + /* Error; no addresses could be used, so we need to notify. */ + MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO); + MVMROOT2(tc, t, arr, { + MVMString *msg_str = MVM_string_ascii_decode_nt(tc, + tc->instance->VMString, uv_strerror(ssi->error)); + MVMObject *msg_box = MVM_repr_box_str(tc, + tc->instance->boot_types.BOOTStr, msg_str); + MVM_repr_push_o(tc, arr, msg_box); }); } + MVM_repr_push_o(tc, t->body.queue, arr); +} + + +/* Initilalize the UDP socket on the event loop. */ +static void setup_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) { + /* Set up the UDP handle. */ + SocketSetupInfo *ssi = (SocketSetupInfo *)data; + ssi->handle = MVM_malloc(sizeof(uv_udp_t)); + ssi->handle->data = ssi; + ssi->tc = tc; + ssi->loop = loop; + ssi->async_task = async_task; + + do_setup_setup((uv_handle_t *)ssi->handle); } /* Frees info for a connection task. */ static void setup_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) { - if (data) { + if (data != NULL) { SocketSetupInfo *ssi = (SocketSetupInfo *)data; - if (ssi->bind_addr) - MVM_free(ssi->bind_addr); + if (ssi->records != NULL) + freeaddrinfo(ssi->records); MVM_free(ssi); } } @@ -565,9 +635,10 @@ MVMObject * MVM_io_socket_udp_async(MVMThreadContext *tc, MVMObject *queue, MVMObject *schedulee, MVMString *host, MVMint64 port, MVMint64 flags, MVMObject *async_type) { + int bind = (host != NULL && IS_CONCRETE(host)); + struct addrinfo *records = NULL; MVMAsyncTask *task; SocketSetupInfo *ssi; - struct sockaddr *bind_addr = NULL; /* Validate REPRs. */ if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue) @@ -577,27 +648,26 @@ MVMObject * MVM_io_socket_udp_async(MVMThreadContext *tc, MVMObject *queue, MVM_exception_throw_adhoc(tc, "asyncudp result type must have REPR AsyncTask"); - /* Resolve hostname. (Could be done asynchronously too.) */ - if (host && IS_CONCRETE(host)) { - MVMROOT3(tc, queue, schedulee, async_type, { - bind_addr = MVM_io_resolve_host_name(tc, host, port, SOCKET_FAMILY_UNSPEC); - }); - } + MVMROOT4(tc, queue, schedulee, host, async_type, { + /* Resolve hostname. (Could be done asynchronously too.) */ + if (bind) + records = MVM_io_resolve_host_name(tc, host, port, SOCKET_FAMILY_UNSPEC, SOCKET_TYPE_DGRAM, SOCKET_PROTOCOL_UDP, 1); - /* Create async task handle. */ - MVMROOT2(tc, queue, schedulee, { + /* Create async task handle. */ task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type); }); MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue); MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee); task->body.ops = &setup_op_table; ssi = MVM_calloc(1, sizeof(SocketSetupInfo)); - ssi->bind_addr = bind_addr; + ssi->bind = bind; + ssi->records = records; + ssi->cur_record = records; ssi->flags = flags; task->body.data = ssi; /* Hand the task off to the event loop. */ - MVMROOT(tc, task, { + MVMROOT5(tc, queue, schedulee, host, async_type, task, { MVM_io_eventloop_queue_work(tc, (MVMObject *)task); }); diff --git a/src/io/syncsocket.c b/src/io/syncsocket.c index c471687e38..4fb14a7d1e 100644 --- a/src/io/syncsocket.c +++ b/src/io/syncsocket.c @@ -27,10 +27,10 @@ #define PACKET_SIZE 65535 /* Error handling varies between POSIX and WinSock. */ -MVM_NO_RETURN static void throw_error(MVMThreadContext *tc, int r, char *operation) MVM_NO_RETURN_ATTRIBUTE; +MVM_NO_RETURN static void throw_error(MVMThreadContext *tc, int r, const char *operation) MVM_NO_RETURN_ATTRIBUTE; #ifdef _WIN32 #define MVM_IS_SOCKET_ERROR(x) ((x) == SOCKET_ERROR) - static void throw_error(MVMThreadContext *tc, int r, char *operation) { + static void throw_error(MVMThreadContext *tc, int r, const char *operation) { int error = WSAGetLastError(); LPTSTR error_string = NULL; if (FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, @@ -42,7 +42,7 @@ MVM_NO_RETURN static void throw_error(MVMThreadContext *tc, int r, char *operati } #else #define MVM_IS_SOCKET_ERROR(x) ((x) < 0) - static void throw_error(MVMThreadContext *tc, int r, char *operation) { + static void throw_error(MVMThreadContext *tc, int r, const char *operation) { MVM_exception_throw_adhoc(tc, "Could not %s: %s", operation, strerror(errno)); } #endif @@ -277,36 +277,35 @@ static size_t get_struct_size_for_family(sa_family_t family) { * * SOCKET_FAMILY_UNIX = 3 * Unix domain socket, will spawn a sockaddr_un which will use the given host as path - * e.g: MVM_io_resolve_host_name(tc, "/run/moarvm.sock", 0, SOCKET_FAMILY_UNIX) + * e.g: MVM_io_resolve_host_name(tc, "/run/moarvm.sock", 0, SOCKET_FAMILY_UNIX, 0, 0, 0) * will spawn an unix domain socket on /run/moarvm.sock + * + * TODO: proper support for the types and protocols that can be passed to this. + * + * Passing passive as 1 will look up hostnames for sockets intended for + * bind(2), and passing it as 0 will look them up for those intended for + * connect(2). */ -struct sockaddr * MVM_io_resolve_host_name(MVMThreadContext *tc, MVMString *host, MVMint64 port, MVMuint16 family) { - char *host_cstr = MVM_string_utf8_encode_C_string(tc, host); - struct sockaddr *dest; - int error; +struct addrinfo * MVM_io_resolve_host_name( + MVMThreadContext *tc, + MVMString *host, + MVMint64 port, + MVMuint16 family, + MVMint64 type, + MVMint64 protocol, + MVMint64 passive +) { + char *host_cstr = MVM_string_utf8_encode_C_string(tc, host); + char port_cstr[8]; + int error; + struct addrinfo hints; struct addrinfo *result; - char port_cstr[8]; - struct addrinfo hints; - -#ifndef _WIN32 - if (family == SOCKET_FAMILY_UNIX) { - struct sockaddr_un *result_un = MVM_malloc(sizeof(struct sockaddr_un)); - - MVMuint64 host_len = strlen(host_cstr); - if (host_len > 107) { - char *waste[] = { host_cstr, NULL }; - MVM_free(result_un); - MVM_exception_throw_adhoc_free(tc, waste, "Socket path '%s' is %"PRIu64" characters, max allowed is 107", host_cstr, host_len); - } - result_un->sun_family = AF_UNIX; - strcpy(result_un->sun_path, host_cstr); - MVM_free(host_cstr); - - return (struct sockaddr *)result_un; - } -#endif + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_flags = passive + ? (AI_ADDRCONFIG | AI_NUMERICSERV | AI_PASSIVE) + : (AI_ADDRCONFIG | AI_NUMERICSERV); switch (family) { case SOCKET_FAMILY_UNSPEC: @@ -318,72 +317,141 @@ struct sockaddr * MVM_io_resolve_host_name(MVMThreadContext *tc, MVMString *host case SOCKET_FAMILY_INET6: hints.ai_family = AF_INET6; break; - case SOCKET_FAMILY_UNIX: - hints.ai_family = AF_UNIX; - break; + case SOCKET_FAMILY_UNIX: { + /* XXX: this is pretty hacky... */ +#ifdef _WIN32 + MVM_exception_throw_adhoc(tc, "UNIX sockets are unsupported on Windows."); +#else + struct sockaddr_un *dest_un; + MVMuint64 host_len = strlen(host_cstr); + + if (host_len > 107) { + char *waste[] = { host_cstr, NULL }; + MVM_free(host_cstr); + MVM_exception_throw_adhoc_free(tc, waste, "Socket path '%s' is %"PRIu64" characters, max allowed is 107", host_cstr, host_len); + } + + dest_un = MVM_malloc(sizeof(struct sockaddr_un)); + dest_un->sun_family = AF_UNIX; + strcpy(dest_un->sun_path, host_cstr); + MVM_free(host_cstr); + + result = MVM_calloc(1, sizeof(struct addrinfo)); + result->ai_family = AF_UNIX; + result->ai_addrlen = sizeof(struct sockaddr_un); + result->ai_addr = (struct sockaddr *)dest_un; + return result; +#endif + } default: MVM_exception_throw_adhoc(tc, "Unsupported socket family: %hu", family); + } + + switch (type) { + case 0: + hints.ai_socktype = 0; break; + case SOCKET_TYPE_STREAM: + hints.ai_socktype = SOCK_STREAM; + break; + case SOCKET_TYPE_DGRAM: + hints.ai_socktype = SOCK_DGRAM; + break; + case SOCKET_TYPE_RAW: + hints.ai_socktype = SOCK_RAW; + break; + case SOCKET_TYPE_RDM: + hints.ai_socktype = SOCK_RDM; + break; + case SOCKET_TYPE_SEQPACKET: + hints.ai_socktype = SOCK_SEQPACKET; + break; + default: + MVM_exception_throw_adhoc(tc, "Unsupported socket type: %lld", type); } - hints.ai_socktype = 0; - hints.ai_flags = AI_PASSIVE; - hints.ai_protocol = 0; - hints.ai_addrlen = 0; - hints.ai_addr = NULL; - hints.ai_canonname = NULL; - hints.ai_next = NULL; + switch (protocol) { + case 0: + hints.ai_protocol = 0; + break; + case SOCKET_PROTOCOL_TCP: + hints.ai_protocol = IPPROTO_TCP; + break; + case SOCKET_PROTOCOL_UDP: + hints.ai_protocol = IPPROTO_UDP; + break; + case SOCKET_PROTOCOL_RAW: + hints.ai_protocol = IPPROTO_RAW; + break; + default: + MVM_exception_throw_adhoc(tc, "Unsupported socket protocol: %lld", protocol); + } snprintf(port_cstr, 8, "%d", (int)port); MVM_gc_mark_thread_blocked(tc); error = getaddrinfo(host_cstr, port_cstr, &hints, &result); MVM_gc_mark_thread_unblocked(tc); - if (error == 0) { - size_t size = get_struct_size_for_family(result->ai_addr->sa_family); - MVM_free(host_cstr); - dest = MVM_malloc(size); - memcpy(dest, result->ai_addr, size); - } - else { + + if (error) { char *waste[] = { host_cstr, NULL }; MVM_exception_throw_adhoc_free(tc, waste, "Failed to resolve host name '%s' with family %hu. Error: '%s'", host_cstr, family, gai_strerror(error)); } - freeaddrinfo(result); - return dest; + return result; } /* Establishes a connection. */ static void socket_connect(MVMThreadContext *tc, MVMOSHandle *h, MVMString *host, MVMint64 port, MVMuint16 family) { - MVMIOSyncSocketData *data = (MVMIOSyncSocketData *)h->body.data; - unsigned int interval_id; + MVMIOSyncSocketData *data = (MVMIOSyncSocketData *)h->body.data; + unsigned int interval_id = MVM_telemetry_interval_start(tc, "syncsocket connect"); - interval_id = MVM_telemetry_interval_start(tc, "syncsocket connect"); if (!data->handle) { - struct sockaddr *dest = MVM_io_resolve_host_name(tc, host, port, family); - int r; + struct addrinfo *result = MVM_io_resolve_host_name(tc, host, port, family, SOCKET_TYPE_STREAM, SOCKET_PROTOCOL_TCP, 0); + struct addrinfo *record; + Socket s; + const char *errmsg = NULL; - Socket s = socket(dest->sa_family , SOCK_STREAM , 0); - if (MVM_IS_SOCKET_ERROR(s)) { - MVM_free(dest); - MVM_telemetry_interval_stop(tc, interval_id, "syncsocket connect"); - throw_error(tc, s, "create socket"); - } + for (record = result; record != NULL; record = record->ai_next) { + int r; - do { MVM_gc_mark_thread_blocked(tc); - r = connect(s, dest, (socklen_t)get_struct_size_for_family(dest->sa_family)); + s = socket(record->ai_family, record->ai_socktype, record->ai_protocol); MVM_gc_mark_thread_unblocked(tc); - } while(r == -1 && errno == EINTR); - MVM_free(dest); - if (MVM_IS_SOCKET_ERROR(r)) { - MVM_telemetry_interval_stop(tc, interval_id, "syncsocket connect"); - throw_error(tc, s, "connect socket"); + if (MVM_IS_SOCKET_ERROR(s)) { + errmsg = "create socket"; + continue; + } + + do { + MVM_gc_mark_thread_blocked(tc); + r = connect(s, record->ai_addr, record->ai_addrlen); + MVM_gc_mark_thread_unblocked(tc); + } while (r == -1 && errno == EINTR); + + if (MVM_IS_SOCKET_ERROR(r)) { + int saved_errno; + + MVM_gc_mark_thread_blocked(tc); + saved_errno = errno; + close(s); + errno = saved_errno; + MVM_gc_mark_thread_unblocked(tc); + + errmsg = "connect socket"; + continue; + } + + data->handle = s; + errmsg = NULL; + break; } - data->handle = s; + if (errmsg != NULL) { + MVM_telemetry_interval_stop(tc, interval_id, "syncsocket connect"); + throw_error(tc, s, errmsg); + } } else { MVM_telemetry_interval_stop(tc, interval_id, "syncsocket didn't connect"); @@ -394,38 +462,77 @@ static void socket_connect(MVMThreadContext *tc, MVMOSHandle *h, MVMString *host static void socket_bind(MVMThreadContext *tc, MVMOSHandle *h, MVMString *host, MVMint64 port, MVMuint16 family, MVMint32 backlog) { MVMIOSyncSocketData *data = (MVMIOSyncSocketData *)h->body.data; if (!data->handle) { - struct sockaddr *dest = MVM_io_resolve_host_name(tc, host, port, family); - int r; + struct addrinfo *result = MVM_io_resolve_host_name(tc, host, port, family, SOCKET_TYPE_STREAM, SOCKET_PROTOCOL_TCP, 1); + struct addrinfo *record; + Socket s; + const char *errmsg = NULL; - Socket s = socket(dest->sa_family , SOCK_STREAM , 0); - if (MVM_IS_SOCKET_ERROR(s)) { - MVM_free(dest); - throw_error(tc, s, "create socket"); - } + for (record = result; record != NULL; record = record->ai_next) { + int r; - /* On POSIX, we set the SO_REUSEADDR option, which allows re-use of - * a port in TIME_WAIT state (modulo many hair details). Oringinally, - * MoarVM used libuv, which does this automatically on non-Windows. - * We have tests with bring up a server, then take it down, and then - * bring another up on the same port, and we get test failures due - * to racing to re-use the port without this. */ + MVM_gc_mark_thread_blocked(tc); + s = socket(record->ai_family, record->ai_socktype, record->ai_protocol); + MVM_gc_mark_thread_unblocked(tc); + if (MVM_IS_SOCKET_ERROR(s)) { + errmsg = "create socket"; + continue; + } + + /* On POSIX, we set the SO_REUSEADDR option, which allows re-use of + * a port in TIME_WAIT state (modulo many hair details). Oringinally, + * MoarVM used libuv, which does this automatically on non-Windows. + * We have tests with bring up a server, then take it down, and then + * bring another up on the same port, and we get test failures due + * to racing to re-use the port without this. */ #ifndef _WIN32 - { - int one = 1; - setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); - } + { + int one = 1; + + MVM_gc_mark_thread_blocked(tc); + setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); + MVM_gc_mark_thread_unblocked(tc); + } #endif - r = bind(s, dest, (socklen_t)get_struct_size_for_family(dest->sa_family)); - MVM_free(dest); - if (MVM_IS_SOCKET_ERROR(r)) - throw_error(tc, s, "bind socket"); + MVM_gc_mark_thread_blocked(tc); + r = bind(s, record->ai_addr, record->ai_addrlen); + MVM_gc_mark_thread_unblocked(tc); + if (MVM_IS_SOCKET_ERROR(r)) { + int saved_errno; - r = listen(s, (int)backlog); - if (MVM_IS_SOCKET_ERROR(r)) - throw_error(tc, s, "start listening on socket"); + MVM_gc_mark_thread_blocked(tc); + saved_errno = errno; + close(s); + errno = saved_errno; + MVM_gc_mark_thread_unblocked(tc); - data->handle = s; + errmsg = "bind socket"; + continue; + } + + MVM_gc_mark_thread_blocked(tc); + r = listen(s, (int)backlog); + MVM_gc_mark_thread_unblocked(tc); + if (MVM_IS_SOCKET_ERROR(r)) { + int saved_errno; + + MVM_gc_mark_thread_blocked(tc); + saved_errno = errno; + close(s); + errno = saved_errno; + MVM_gc_mark_thread_unblocked(tc); + + errmsg = "start listening on socket"; + continue; + } + + data->handle = s; + errmsg = NULL; + break; + } + + if (errmsg != NULL) + throw_error(tc, s, errmsg); } else { MVM_exception_throw_adhoc(tc, "Socket is already bound or connected"); diff --git a/src/io/syncsocket.h b/src/io/syncsocket.h index 5a757eee57..31dce86e22 100644 --- a/src/io/syncsocket.h +++ b/src/io/syncsocket.h @@ -5,6 +5,28 @@ typedef enum { SOCKET_FAMILY_UNIX } MVMSocketFamily; +typedef enum { + SOCKET_TYPE_STREAM = 1, + SOCKET_TYPE_DGRAM, + SOCKET_TYPE_RAW, + SOCKET_TYPE_RDM, + SOCKET_TYPE_SEQPACKET +} MVMSocketType; + +typedef enum { + SOCKET_PROTOCOL_TCP = 1, + SOCKET_PROTOCOL_UDP, + SOCKET_PROTOCOL_RAW +} MVMSocketProtocol; + MVMObject * MVM_io_socket_create(MVMThreadContext *tc, MVMint64 listen); -struct sockaddr * MVM_io_resolve_host_name(MVMThreadContext *tc, MVMString *host, MVMint64 port, MVMuint16 family); +struct addrinfo * MVM_io_resolve_host_name( + MVMThreadContext *tc, + MVMString *host, + MVMint64 port, + MVMuint16 family, + MVMint64 type, + MVMint64 protocol, + MVMint64 passive +); MVMString * MVM_io_get_hostname(MVMThreadContext *tc);