Skip to content

Commit

Permalink
Never call freeaddrinfo more than once, always hold on to socket errors
Browse files Browse the repository at this point in the history
This makes debugging sockets easier.
  • Loading branch information
Kaiepi committed Oct 7, 2019
1 parent 2f4798b commit d9a067d
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 144 deletions.
175 changes: 79 additions & 96 deletions src/io/asyncsocket.c
Expand Up @@ -8,11 +8,12 @@ typedef struct {

/* Info we convey about a read task. */
typedef struct {
MVMThreadContext *tc;
int work_idx;
MVMOSHandle *handle;
MVMObject *buf_type;
int seq_number;
MVMThreadContext *tc;
int work_idx;
int error;
} ReadInfo;

/* Allocates a buffer of the suggested size. */
Expand Down Expand Up @@ -107,7 +108,6 @@ static void read_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_t
uv_handle_t *handle;
MVMAsyncTask *t = (MVMAsyncTask *)async_task;
MVMObject *arr;
int r;

/* Add to work in progress. */
ri->tc = tc;
Expand All @@ -127,7 +127,7 @@ static void read_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_t
goto closed;

/* Start reading the stream. */
if ((r = uv_read_start(handle_data->handle, on_alloc, on_read)) == 0)
if ((ri->error = uv_read_start(handle_data->handle, on_alloc, on_read)) == 0)
/* Success; finish up in on_read. */
return;

Expand All @@ -140,7 +140,7 @@ static void read_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_t
MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
MVMROOT4(tc, t, arr, ri->handle, ri->buf_type, {
MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
tc->instance->VMString, uv_strerror(r));
tc->instance->VMString, uv_strerror(ri->error));
MVMObject *msg_box = MVM_repr_box_str(tc,
tc->instance->boot_types.BOOTStr, msg_str);
MVM_repr_push_o(tc, arr, msg_box);
Expand Down Expand Up @@ -170,6 +170,7 @@ static void read_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_t
/* Stops reading. */
static void read_cancel(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
ReadInfo *ri = (ReadInfo *)data;

if (ri->work_idx >= 0) {
MVMIOAsyncSocketData *handle_data = (MVMIOAsyncSocketData *)ri->handle->body.data;
uv_handle_t *handle = (uv_handle_t *)handle_data->handle;
Expand Down Expand Up @@ -246,12 +247,13 @@ static MVMAsyncTask * read_bytes(MVMThreadContext *tc, MVMOSHandle *h, MVMObject

/* Info we convey about a write task. */
typedef struct {
MVMThreadContext *tc;
int work_idx;
MVMOSHandle *handle;
MVMObject *buf_data;
uv_write_t *req;
uv_buf_t buf;
MVMThreadContext *tc;
int work_idx;
int error;
} WriteInfo;

/* Completion handler for an asynchronous write. */
Expand Down Expand Up @@ -298,7 +300,6 @@ static void write_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_
int output_size;
MVMAsyncTask *t = (MVMAsyncTask *)async_task;
MVMObject *arr;
int r;

/* Add to work in progress. */
wi->tc = tc;
Expand Down Expand Up @@ -326,7 +327,7 @@ static void write_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_
goto closed;

/* Do our write. */
if ((r = uv_write(wi->req, handle_data->handle, &(wi->buf), 1, on_write)) == 0)
if ((wi->error = uv_write(wi->req, handle_data->handle, &(wi->buf), 1, on_write)) == 0)
/* Success; finish up in on_write. */
return;

Expand All @@ -338,7 +339,7 @@ static void write_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_
MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
MVMROOT4(tc, t, arr, wi->handle, wi->buf_data, {
MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
tc->instance->VMString, uv_strerror(r));
tc->instance->VMString, uv_strerror(wi->error));
MVMObject *msg_box = MVM_repr_box_str(tc,
tc->instance->boot_types.BOOTStr, msg_str);
MVM_repr_push_o(tc, arr, msg_box);
Expand Down Expand Up @@ -502,8 +503,8 @@ static MVMint64 socket_is_tty(MVMThreadContext *tc, MVMOSHandle *h) {
static MVMint64 socket_handle(MVMThreadContext *tc, MVMOSHandle *h) {
MVMIOAsyncSocketData *data = (MVMIOAsyncSocketData *)h->body.data;
uv_handle_t *handle = (uv_handle_t *)data->handle;
int fd;
uv_os_fd_t fh;
int fd;
uv_os_fd_t fh;

uv_fileno(handle, &fh);
fd = uv_open_osfhandle(fh);
Expand Down Expand Up @@ -585,10 +586,11 @@ typedef struct {
MVMThreadContext *tc;
uv_loop_t *loop;
MVMObject *async_task;
struct addrinfo *dest;
int work_idx;
struct addrinfo *records;
struct addrinfo *cur_record;
uv_tcp_t *socket;
uv_connect_t *connect;
int work_idx;
int error;
} ConnectInfo;

Expand Down Expand Up @@ -647,32 +649,24 @@ static void on_connect(uv_connect_t* req, int status) {

/* Does the actual work of making the connection. */
static void do_connect_setup(uv_handle_t *handle) {
ConnectInfo *ci = (ConnectInfo *)handle->data;
MVMThreadContext *tc = ci->tc;
struct addrinfo *record;
MVMAsyncTask *t = (MVMAsyncTask *)ci->async_task;
ConnectInfo *ci = (ConnectInfo *)handle->data;
MVMThreadContext *tc = ci->tc;
MVMAsyncTask *t = (MVMAsyncTask *)ci->async_task;
MVMObject *arr;

for (record = ci->dest; record != NULL; record = record->ai_next) {
for (; ci->cur_record != NULL; ci->cur_record = ci->cur_record->ai_next) {
if ((ci->error = uv_tcp_init(ci->loop, ci->socket)) != 0)
continue;

if ((ci->error = uv_tcp_connect(ci->connect, ci->socket, record->ai_addr, on_connect)) == 0)
/* Success; finish up in on_connect. */
return;

{
/* Error; try the rest of the addresses, if any, before throwing. */
struct addrinfo *next = record->ai_next;
record->ai_next = NULL;
freeaddrinfo(ci->dest);
ci->dest = next;
if ((ci->error = uv_tcp_connect(ci->connect, ci->socket, ci->cur_record->ai_addr, on_connect)) != 0)
/* Error; try again with the next address, if any, before throwing. */
uv_close(handle, do_connect_setup);
return;
}

/* If we succeeded, finish up in on_connect. */
return;
}

/* Error; need to notify. */
/* Error; no addresses could be used, so we need to notify. */
MVMROOT(tc, t, {
arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
});
Expand Down Expand Up @@ -715,7 +709,6 @@ static void connect_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *asyn
ci->socket->data = data;
ci->connect = MVM_malloc(sizeof(uv_connect_t));
ci->connect->data = data;
ci->error = 0;

do_connect_setup((uv_handle_t *)ci->socket);
}
Expand All @@ -724,8 +717,8 @@ static void connect_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *asyn
static void connect_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) {
if (data) {
ConnectInfo *ci = (ConnectInfo *)data;
if (ci->dest != NULL)
freeaddrinfo(ci->dest);
if (ci->records != NULL)
freeaddrinfo(ci->records);
MVM_free(ci);
}
}
Expand All @@ -744,7 +737,7 @@ MVMObject * MVM_io_socket_connect_async(MVMThreadContext *tc, MVMObject *queue,
MVMObject *schedulee, MVMString *host,
MVMint64 port, MVMObject *async_type) {
MVMAsyncTask *task;
struct addrinfo *dest;
struct addrinfo *records;
ConnectInfo *ci;

/* Validate REPRs. */
Expand All @@ -758,14 +751,15 @@ MVMObject * MVM_io_socket_connect_async(MVMThreadContext *tc, MVMObject *queue,

MVMROOT4(tc, queue, schedulee, host, async_type, {
/* Resolve hostname. (Could be done asynchronously too.) */
dest = MVM_io_resolve_host_name(tc, host, port, SOCKET_FAMILY_UNSPEC, SOCKET_TYPE_STREAM, SOCKET_PROTOCOL_TCP, 0);
records = MVM_io_resolve_host_name(tc, host, port, SOCKET_FAMILY_UNSPEC, SOCKET_TYPE_STREAM, SOCKET_PROTOCOL_TCP, 0);
/* Create async task handle. */
task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type);
task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type);
});

MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue); MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee);
ci = MVM_calloc(1, sizeof(ConnectInfo));
ci->dest = dest;
ci->records = records;
ci->cur_record = records;
task->body.data = ci;
task->body.ops = &connect_op_table;

Expand All @@ -784,7 +778,8 @@ typedef struct {
MVMObject *async_task;
int work_idx;
int backlog;
struct addrinfo *dest;
struct addrinfo *records;
struct addrinfo *cur_record;
uv_tcp_t *socket;
int error;
} ListenInfo;
Expand Down Expand Up @@ -862,64 +857,52 @@ static void on_connection(uv_stream_t *server, int status) {
}

static void do_listen_setup(uv_handle_t *handle) {
ListenInfo *li = (ListenInfo *)handle->data;
MVMThreadContext *tc = li->tc;
MVMAsyncTask *t = (MVMAsyncTask *)li->async_task;
ListenInfo *li = (ListenInfo *)handle->data;
MVMThreadContext *tc = li->tc;
MVMAsyncTask *t = (MVMAsyncTask *)li->async_task;
MVMObject *arr;
struct addrinfo *record;

for (record = li->dest; record != NULL; record = record->ai_next) {
for (; li->cur_record != NULL; li->cur_record = li->cur_record->ai_next) {
if ((li->error = uv_tcp_init(li->loop, li->socket)) != 0)
continue;

if ((li->error = uv_tcp_bind(li->socket, record->ai_addr, 0)) == 0 &&
(li->error = uv_listen((uv_stream_t *)li->socket, li->backlog, on_connection) == 0))
break;

{
if ((li->error = uv_tcp_bind(li->socket, li->cur_record->ai_addr, 0)) != 0 ||
(li->error = uv_listen((uv_stream_t *)li->socket, li->backlog, on_connection) != 0)) {
/* Error; try the rest of the addresses, if any, before throwing. */
struct addrinfo *next = record->ai_next;
record->ai_next = NULL;
freeaddrinfo(li->dest);
li->dest = next;
if (next == NULL)
goto error;
else {
uv_close(handle, do_listen_setup);
return;
}
}
}
li->cur_record = li->cur_record->ai_next;
uv_close(handle, do_listen_setup);
} else {
/* Success; allocate our handle. */
MVMROOT(tc, t, {
arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
});
MVM_repr_push_o(tc, arr, t->body.schedulee);
MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO);
MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
MVMROOT2(tc, t, arr, {
MVMOSHandle *result = (MVMOSHandle *)MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTIO);
MVMIOAsyncSocketData *data = MVM_calloc(1, sizeof(MVMIOAsyncSocketData));
data->handle = (uv_stream_t *)li->socket;
result->body.ops = &op_table;
result->body.data = data;
MVM_repr_push_o(tc, arr, (MVMObject *)result);

/* Success; allocate our handle. */
MVMROOT(tc, t, {
arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
});
MVM_repr_push_o(tc, arr, t->body.schedulee);
MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO);
MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
MVMROOT2(tc, t, arr, {
MVMOSHandle *result = (MVMOSHandle *)MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTIO);
MVMIOAsyncSocketData *data = MVM_calloc(1, sizeof(MVMIOAsyncSocketData));
data->handle = (uv_stream_t *)li->socket;
result->body.ops = &op_table;
result->body.data = data;
MVM_repr_push_o(tc, arr, (MVMObject *)result);

{
struct sockaddr_storage name;
int name_len = sizeof(struct sockaddr_storage);
uv_tcp_getsockname(li->socket, (struct sockaddr *)&name, &name_len);
push_name_and_port(tc, &name, arr);
{
struct sockaddr_storage name;
int name_len = sizeof(struct sockaddr_storage);
uv_tcp_getsockname(li->socket, (struct sockaddr *)&name, &name_len);
push_name_and_port(tc, &name, arr);
}
});
MVM_repr_push_o(tc, t->body.queue, arr);
}
});
MVM_repr_push_o(tc, t->body.queue, arr);
return;

error:
/* Error; need to notify. */
return;
}

/* Error; no addresses could be used, so we need to notify. */
MVMROOT(tc, t, {
arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
});
Expand Down Expand Up @@ -959,7 +942,6 @@ static void listen_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async
/* Create and initialize socket and connection, and start listening. */
li->socket = MVM_malloc(sizeof(uv_tcp_t));
li->socket->data = data;
li->error = 0;

do_listen_setup((uv_handle_t *)li->socket);
}
Expand Down Expand Up @@ -990,8 +972,8 @@ static void listen_gc_mark(MVMThreadContext *tc, void *data, MVMGCWorklist *work
static void listen_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) {
if (data != NULL) {
ListenInfo *li = (ListenInfo *)data;
if (li->dest != NULL)
freeaddrinfo(li->dest);
if (li->records != NULL)
freeaddrinfo(li->records);
MVM_free(li);
}
}
Expand All @@ -1011,7 +993,7 @@ MVMObject * MVM_io_socket_listen_async(MVMThreadContext *tc, MVMObject *queue,
MVMint64 port, MVMint32 backlog, MVMObject *async_type) {
MVMAsyncTask *task;
ListenInfo *li;
struct addrinfo *dest;
struct addrinfo *records;

/* Validate REPRs. */
if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue)
Expand All @@ -1023,16 +1005,17 @@ MVMObject * MVM_io_socket_listen_async(MVMThreadContext *tc, MVMObject *queue,

MVMROOT4(tc, queue, schedulee, host, async_type, {
/* Resolve hostname. (Could be done asynchronously too.) */
dest = MVM_io_resolve_host_name(tc, host, port, SOCKET_FAMILY_UNSPEC, SOCKET_TYPE_STREAM, SOCKET_PROTOCOL_TCP, 1);
records = MVM_io_resolve_host_name(tc, host, port, SOCKET_FAMILY_UNSPEC, SOCKET_TYPE_STREAM, SOCKET_PROTOCOL_TCP, 1);
/* Create async task handle. */
task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type);
task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type);
});

MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue);
MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee);
li = MVM_calloc(1, sizeof(ListenInfo));
li->dest = dest;
li->backlog = backlog;
li->records = records;
li->cur_record = records;
task->body.data = li;
task->body.ops = &listen_op_table;

Expand Down

0 comments on commit d9a067d

Please sign in to comment.