Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Windows: Do simultaneous pending AcceptEx calls.

  • Loading branch information...
commit 8448ee4bf62f0a7ae7cab389e3cf4b473669ebd4 1 parent 1231193
Igor Zinkovsky authored
Showing with 76 additions and 32 deletions.
  1. +9 −4 include/uv-win.h
  2. +67 −28 src/win/tcp.c
View
13 include/uv-win.h
@@ -75,7 +75,13 @@ typedef struct uv_buf_t {
UV_REQ_FIELDS \
HANDLE pipeHandle; \
struct uv_pipe_accept_s* next_pending; \
- } uv_pipe_accept_t;
+ } uv_pipe_accept_t; \
+ typedef struct uv_tcp_accept_s { \
+ UV_REQ_FIELDS \
+ SOCKET accept_socket; \
+ char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; \
+ struct uv_tcp_accept_s* next_pending; \
+ } uv_tcp_accept_t;
#define uv_stream_connection_fields \
unsigned int write_reqs_pending; \
@@ -95,9 +101,8 @@ typedef struct uv_buf_t {
};
#define uv_tcp_server_fields \
- SOCKET accept_socket; \
- char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; \
- struct uv_req_s accept_req;
+ uv_tcp_accept_t* accept_reqs; \
+ uv_tcp_accept_t* pending_accepts;
#define uv_tcp_connection_fields \
uv_buf_t read_buffer;
View
95 src/win/tcp.c
@@ -109,6 +109,12 @@
const unsigned int uv_active_tcp_streams_threshold = 50;
+/*
+ * Number of simultaneous pending AcceptEx calls.
+ */
+const unsigned int uv_simultaneous_server_accepts = 32;
+
+
/* Pointers to winsock extension functions to be retrieved dynamically */
static LPFN_CONNECTEX pConnectEx;
static LPFN_ACCEPTEX pAcceptEx;
@@ -293,10 +299,11 @@ static int uv_tcp_set_socket(uv_tcp_t* handle, SOCKET socket) {
int uv_tcp_init(uv_tcp_t* handle) {
uv_stream_init((uv_stream_t*)handle);
+ handle->accept_reqs = NULL;
+ handle->pending_accepts = NULL;
handle->socket = INVALID_SOCKET;
handle->type = UV_TCP;
handle->reqs_pending = 0;
- handle->accept_socket = INVALID_SOCKET;
uv_counters()->tcp_init++;
@@ -308,7 +315,8 @@ void uv_tcp_endgame(uv_tcp_t* handle) {
uv_err_t err;
int status;
- if (handle->flags & UV_HANDLE_SHUTTING &&
+ if (handle->flags & UV_HANDLE_CONNECTION &&
+ handle->flags & UV_HANDLE_SHUTTING &&
!(handle->flags & UV_HANDLE_SHUT) &&
handle->write_reqs_pending == 0) {
@@ -333,6 +341,11 @@ void uv_tcp_endgame(uv_tcp_t* handle) {
assert(!(handle->flags & UV_HANDLE_CLOSED));
handle->flags |= UV_HANDLE_CLOSED;
+ if (!(handle->flags & UV_HANDLE_CONNECTION) && handle->accept_reqs) {
+ free(handle->accept_reqs);
+ handle->accept_reqs = NULL;
+ }
+
if (handle->close_cb) {
handle->close_cb((uv_handle_t*)handle);
}
@@ -407,8 +420,7 @@ int uv_tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6 addr) {
}
-static void uv_tcp_queue_accept(uv_tcp_t* handle) {
- uv_req_t* req;
+static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
BOOL success;
DWORD bytes;
SOCKET accept_socket;
@@ -416,10 +428,7 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle) {
LPFN_ACCEPTEX pAcceptExFamily;
assert(handle->flags & UV_HANDLE_LISTENING);
- assert(handle->accept_socket == INVALID_SOCKET);
-
- /* Prepare the uv_req structure. */
- req = &handle->accept_req;
+ assert(req->accept_socket == INVALID_SOCKET);
/* choose family and extension function */
if ((handle->flags & UV_HANDLE_IPV6) != 0) {
@@ -434,7 +443,7 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle) {
accept_socket = socket(family, SOCK_STREAM, 0);
if (accept_socket == INVALID_SOCKET) {
req->error = uv_new_sys_error(WSAGetLastError());
- uv_insert_pending_req(req);
+ uv_insert_pending_req((uv_req_t*)req);
handle->reqs_pending++;
return;
}
@@ -444,7 +453,7 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle) {
success = pAcceptExFamily(handle->socket,
accept_socket,
- (void*)&handle->accept_buffer,
+ (void*)req->accept_buffer,
0,
sizeof(struct sockaddr_storage),
sizeof(struct sockaddr_storage),
@@ -453,17 +462,17 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle) {
if (UV_SUCCEEDED_WITHOUT_IOCP(success)) {
/* Process the req without IOCP. */
- handle->accept_socket = accept_socket;
+ req->accept_socket = accept_socket;
handle->reqs_pending++;
uv_insert_pending_req((uv_req_t*)req);
} else if (UV_SUCCEEDED_WITH_IOCP(success)) {
/* The req will be processed with IOCP. */
- handle->accept_socket = accept_socket;
+ req->accept_socket = accept_socket;
handle->reqs_pending++;
} else {
/* Make this req pending reporting an error. */
req->error = uv_new_sys_error(WSAGetLastError());
- uv_insert_pending_req(req);
+ uv_insert_pending_req((uv_req_t*)req);
handle->reqs_pending++;
/* Destroy the preallocated client socket. */
closesocket(accept_socket);
@@ -527,6 +536,9 @@ static void uv_tcp_queue_read(uv_tcp_t* handle) {
int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
+ unsigned int i;
+ uv_tcp_accept_t* req;
+
assert(backlog > 0);
if (handle->flags & UV_HANDLE_BIND_ERROR) {
@@ -553,10 +565,21 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
handle->flags |= UV_HANDLE_LISTENING;
handle->connection_cb = cb;
- uv_req_init(&(handle->accept_req));
- handle->accept_req.type = UV_ACCEPT;
- handle->accept_req.data = handle;
- uv_tcp_queue_accept(handle);
+ assert(!handle->accept_reqs);
+ handle->accept_reqs = (uv_tcp_accept_t*)
+ malloc(uv_simultaneous_server_accepts * sizeof(uv_tcp_accept_t));
+ if (!handle->accept_reqs) {
+ uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
+ }
+
+ for (i = 0; i < uv_simultaneous_server_accepts; i++) {
+ req = &handle->accept_reqs[i];
+ uv_req_init((uv_req_t*)req);
+ req->type = UV_ACCEPT;
+ req->accept_socket = INVALID_SOCKET;
+ req->data = handle;
+ uv_tcp_queue_accept(handle, req);
+ }
return 0;
}
@@ -565,22 +588,33 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) {
int rv = 0;
- if (server->accept_socket == INVALID_SOCKET) {
+ uv_tcp_accept_t* req = server->pending_accepts;
+
+ if (!req) {
+ /* No valid connections found, so we error out. */
+ uv_set_sys_error(WSAEWOULDBLOCK);
+ return -1;
+ }
+
+ if (req->accept_socket == INVALID_SOCKET) {
uv_set_sys_error(WSAENOTCONN);
return -1;
}
- if (uv_tcp_set_socket(client, server->accept_socket) == -1) {
- closesocket(server->accept_socket);
+ if (uv_tcp_set_socket(client, req->accept_socket) == -1) {
+ closesocket(req->accept_socket);
rv = -1;
} else {
uv_connection_init((uv_stream_t*)client);
}
- server->accept_socket = INVALID_SOCKET;
+ /* Prepare the req to pick up a new connection */
+ server->pending_accepts = req->next_pending;
+ req->next_pending = NULL;
+ req->accept_socket = INVALID_SOCKET;
if (!(server->flags & UV_HANDLE_CLOSING)) {
- uv_tcp_queue_accept(server);
+ uv_tcp_queue_accept(server, req);
}
active_tcp_streams++;
@@ -905,14 +939,16 @@ void uv_process_tcp_write_req(uv_tcp_t* handle, uv_write_t* req) {
}
-void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* req) {
+void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* raw_req) {
+ uv_tcp_accept_t* req = (uv_tcp_accept_t*) raw_req;
+
assert(handle->type == UV_TCP);
/* If handle->accepted_socket is not a valid socket, then */
/* uv_queue_accept must have failed. This is a serious error. We stop */
/* accepting connections and report this error to the connection */
/* callback. */
- if (handle->accept_socket == INVALID_SOCKET) {
+ if (req->accept_socket == INVALID_SOCKET) {
if (handle->flags & UV_HANDLE_LISTENING) {
handle->flags &= ~UV_HANDLE_LISTENING;
if (handle->connection_cb) {
@@ -921,11 +957,14 @@ void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* req) {
}
}
} else if (req->error.code == UV_OK &&
- setsockopt(handle->accept_socket,
+ setsockopt(req->accept_socket,
SOL_SOCKET,
SO_UPDATE_ACCEPT_CONTEXT,
(char*)&handle->socket,
sizeof(handle->socket)) == 0) {
+ req->next_pending = handle->pending_accepts;
+ handle->pending_accepts = req;
+
/* Accept and SO_UPDATE_ACCEPT_CONTEXT were successful. */
if (handle->connection_cb) {
handle->connection_cb((uv_stream_t*)handle, 0);
@@ -934,10 +973,10 @@ void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* req) {
/* Error related to accepted socket is ignored because the server */
/* socket may still be healthy. If the server socket is broken
/* uv_queue_accept will detect it. */
- closesocket(handle->accept_socket);
- handle->accept_socket = INVALID_SOCKET;
+ closesocket(req->accept_socket);
+ req->accept_socket = INVALID_SOCKET;
if (handle->flags & UV_HANDLE_LISTENING) {
- uv_tcp_queue_accept(handle);
+ uv_tcp_queue_accept(handle, req);
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.