Skip to content

Commit

Permalink
Windows: Do simultaneous pending AcceptEx calls.
Browse files Browse the repository at this point in the history
  • Loading branch information
Igor Zinkovsky committed Aug 19, 2011
1 parent 1231193 commit 8448ee4
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 32 deletions.
13 changes: 9 additions & 4 deletions include/uv-win.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,13 @@ typedef struct uv_buf_t {
UV_REQ_FIELDS \
HANDLE pipeHandle; \
struct uv_pipe_accept_s* next_pending; \
} uv_pipe_accept_t;
} uv_pipe_accept_t; \
typedef struct uv_tcp_accept_s { \
UV_REQ_FIELDS \
SOCKET accept_socket; \
char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; \
struct uv_tcp_accept_s* next_pending; \
} uv_tcp_accept_t;

#define uv_stream_connection_fields \
unsigned int write_reqs_pending; \
Expand All @@ -95,9 +101,8 @@ typedef struct uv_buf_t {
};

#define uv_tcp_server_fields \
SOCKET accept_socket; \
char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; \
struct uv_req_s accept_req;
uv_tcp_accept_t* accept_reqs; \
uv_tcp_accept_t* pending_accepts;

#define uv_tcp_connection_fields \
uv_buf_t read_buffer;
Expand Down
95 changes: 67 additions & 28 deletions src/win/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@
const unsigned int uv_active_tcp_streams_threshold = 50;


/*
* Number of simultaneous pending AcceptEx calls.
*/
const unsigned int uv_simultaneous_server_accepts = 32;


/* Pointers to winsock extension functions to be retrieved dynamically */
static LPFN_CONNECTEX pConnectEx;
static LPFN_ACCEPTEX pAcceptEx;
Expand Down Expand Up @@ -293,10 +299,11 @@ static int uv_tcp_set_socket(uv_tcp_t* handle, SOCKET socket) {
int uv_tcp_init(uv_tcp_t* handle) {
uv_stream_init((uv_stream_t*)handle);

handle->accept_reqs = NULL;
handle->pending_accepts = NULL;
handle->socket = INVALID_SOCKET;
handle->type = UV_TCP;
handle->reqs_pending = 0;
handle->accept_socket = INVALID_SOCKET;

uv_counters()->tcp_init++;

Expand All @@ -308,7 +315,8 @@ void uv_tcp_endgame(uv_tcp_t* handle) {
uv_err_t err;
int status;

if (handle->flags & UV_HANDLE_SHUTTING &&
if (handle->flags & UV_HANDLE_CONNECTION &&
handle->flags & UV_HANDLE_SHUTTING &&
!(handle->flags & UV_HANDLE_SHUT) &&
handle->write_reqs_pending == 0) {

Expand All @@ -333,6 +341,11 @@ void uv_tcp_endgame(uv_tcp_t* handle) {
assert(!(handle->flags & UV_HANDLE_CLOSED));
handle->flags |= UV_HANDLE_CLOSED;

if (!(handle->flags & UV_HANDLE_CONNECTION) && handle->accept_reqs) {
free(handle->accept_reqs);
handle->accept_reqs = NULL;
}

if (handle->close_cb) {
handle->close_cb((uv_handle_t*)handle);
}
Expand Down Expand Up @@ -407,19 +420,15 @@ int uv_tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6 addr) {
}


static void uv_tcp_queue_accept(uv_tcp_t* handle) {
uv_req_t* req;
static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
BOOL success;
DWORD bytes;
SOCKET accept_socket;
short family;
LPFN_ACCEPTEX pAcceptExFamily;

assert(handle->flags & UV_HANDLE_LISTENING);
assert(handle->accept_socket == INVALID_SOCKET);

/* Prepare the uv_req structure. */
req = &handle->accept_req;
assert(req->accept_socket == INVALID_SOCKET);

/* choose family and extension function */
if ((handle->flags & UV_HANDLE_IPV6) != 0) {
Expand All @@ -434,7 +443,7 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle) {
accept_socket = socket(family, SOCK_STREAM, 0);
if (accept_socket == INVALID_SOCKET) {
req->error = uv_new_sys_error(WSAGetLastError());
uv_insert_pending_req(req);
uv_insert_pending_req((uv_req_t*)req);
handle->reqs_pending++;
return;
}
Expand All @@ -444,7 +453,7 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle) {

success = pAcceptExFamily(handle->socket,
accept_socket,
(void*)&handle->accept_buffer,
(void*)req->accept_buffer,
0,
sizeof(struct sockaddr_storage),
sizeof(struct sockaddr_storage),
Expand All @@ -453,17 +462,17 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle) {

if (UV_SUCCEEDED_WITHOUT_IOCP(success)) {
/* Process the req without IOCP. */
handle->accept_socket = accept_socket;
req->accept_socket = accept_socket;
handle->reqs_pending++;
uv_insert_pending_req((uv_req_t*)req);
} else if (UV_SUCCEEDED_WITH_IOCP(success)) {
/* The req will be processed with IOCP. */
handle->accept_socket = accept_socket;
req->accept_socket = accept_socket;
handle->reqs_pending++;
} else {
/* Make this req pending reporting an error. */
req->error = uv_new_sys_error(WSAGetLastError());
uv_insert_pending_req(req);
uv_insert_pending_req((uv_req_t*)req);
handle->reqs_pending++;
/* Destroy the preallocated client socket. */
closesocket(accept_socket);
Expand Down Expand Up @@ -527,6 +536,9 @@ static void uv_tcp_queue_read(uv_tcp_t* handle) {


int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
unsigned int i;
uv_tcp_accept_t* req;

assert(backlog > 0);

if (handle->flags & UV_HANDLE_BIND_ERROR) {
Expand All @@ -553,10 +565,21 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
handle->flags |= UV_HANDLE_LISTENING;
handle->connection_cb = cb;

uv_req_init(&(handle->accept_req));
handle->accept_req.type = UV_ACCEPT;
handle->accept_req.data = handle;
uv_tcp_queue_accept(handle);
assert(!handle->accept_reqs);
handle->accept_reqs = (uv_tcp_accept_t*)
malloc(uv_simultaneous_server_accepts * sizeof(uv_tcp_accept_t));
if (!handle->accept_reqs) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
}

for (i = 0; i < uv_simultaneous_server_accepts; i++) {
req = &handle->accept_reqs[i];
uv_req_init((uv_req_t*)req);
req->type = UV_ACCEPT;
req->accept_socket = INVALID_SOCKET;
req->data = handle;
uv_tcp_queue_accept(handle, req);
}

return 0;
}
Expand All @@ -565,22 +588,33 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) {
int rv = 0;

if (server->accept_socket == INVALID_SOCKET) {
uv_tcp_accept_t* req = server->pending_accepts;

if (!req) {
/* No valid connections found, so we error out. */
uv_set_sys_error(WSAEWOULDBLOCK);
return -1;
}

if (req->accept_socket == INVALID_SOCKET) {
uv_set_sys_error(WSAENOTCONN);
return -1;
}

if (uv_tcp_set_socket(client, server->accept_socket) == -1) {
closesocket(server->accept_socket);
if (uv_tcp_set_socket(client, req->accept_socket) == -1) {
closesocket(req->accept_socket);
rv = -1;
} else {
uv_connection_init((uv_stream_t*)client);
}

server->accept_socket = INVALID_SOCKET;
/* Prepare the req to pick up a new connection */
server->pending_accepts = req->next_pending;
req->next_pending = NULL;
req->accept_socket = INVALID_SOCKET;

if (!(server->flags & UV_HANDLE_CLOSING)) {
uv_tcp_queue_accept(server);
uv_tcp_queue_accept(server, req);
}

active_tcp_streams++;
Expand Down Expand Up @@ -905,14 +939,16 @@ void uv_process_tcp_write_req(uv_tcp_t* handle, uv_write_t* req) {
}


void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* req) {
void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* raw_req) {
uv_tcp_accept_t* req = (uv_tcp_accept_t*) raw_req;

assert(handle->type == UV_TCP);

/* If handle->accepted_socket is not a valid socket, then */
/* uv_queue_accept must have failed. This is a serious error. We stop */
/* accepting connections and report this error to the connection */
/* callback. */
if (handle->accept_socket == INVALID_SOCKET) {
if (req->accept_socket == INVALID_SOCKET) {
if (handle->flags & UV_HANDLE_LISTENING) {
handle->flags &= ~UV_HANDLE_LISTENING;
if (handle->connection_cb) {
Expand All @@ -921,11 +957,14 @@ void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* req) {
}
}
} else if (req->error.code == UV_OK &&
setsockopt(handle->accept_socket,
setsockopt(req->accept_socket,
SOL_SOCKET,
SO_UPDATE_ACCEPT_CONTEXT,
(char*)&handle->socket,
sizeof(handle->socket)) == 0) {
req->next_pending = handle->pending_accepts;
handle->pending_accepts = req;

/* Accept and SO_UPDATE_ACCEPT_CONTEXT were successful. */
if (handle->connection_cb) {
handle->connection_cb((uv_stream_t*)handle, 0);
Expand All @@ -934,10 +973,10 @@ void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* req) {
/* Error related to accepted socket is ignored because the server */
/* socket may still be healthy. If the server socket is broken
/* uv_queue_accept will detect it. */
closesocket(handle->accept_socket);
handle->accept_socket = INVALID_SOCKET;
closesocket(req->accept_socket);
req->accept_socket = INVALID_SOCKET;
if (handle->flags & UV_HANDLE_LISTENING) {
uv_tcp_queue_accept(handle);
uv_tcp_queue_accept(handle, req);
}
}

Expand Down

0 comments on commit 8448ee4

Please sign in to comment.