Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

windows: Enable passing of TCP connections over IPC

  • Loading branch information...
commit 74a773ddf937153e22fdfd570c5546a0c4107b0d 1 parent 31ff986
Igor Zinkovsky authored
View
13 include/uv-private/uv-win.h
@@ -214,7 +214,8 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
UV_PROCESS_EXIT, \
UV_PROCESS_CLOSE, \
UV_UDP_RECV, \
- UV_FS_EVENT_REQ
+ UV_FS_EVENT_REQ, \
+ UV_TCP_CLOSE
#define UV_REQ_PRIVATE_FIELDS \
union { \
@@ -286,7 +287,10 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
#define uv_tcp_connection_fields \
uv_buf_t read_buffer; \
- LPFN_CONNECTEX func_connectex;
+ LPFN_CONNECTEX func_connectex; \
+ struct uv_tcp_close_s { \
+ UV_REQ_FIELDS \
+ } close_req;
#define UV_TCP_PRIVATE_FIELDS \
SOCKET socket; \
@@ -318,7 +322,10 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
uv_write_t ipc_header_write_req; \
int ipc_pid; \
uint64_t remaining_ipc_rawdata_bytes; \
- WSAPROTOCOL_INFOW* pending_socket_info; \
+ struct { \
+ WSAPROTOCOL_INFOW* socket_info; \
+ int tcp_connection; \
+ } pending_ipc_info; \
uv_write_t* non_overlapped_writes_tail;
#define UV_PIPE_PRIVATE_FIELDS \
View
15 src/win/handle.c
@@ -71,7 +71,6 @@ int uv_is_active(uv_handle_t* handle) {
void uv_close(uv_handle_t* handle, uv_close_cb cb) {
- uv_tcp_t* tcp;
uv_pipe_t* pipe;
uv_udp_t* udp;
uv_process_t* process;
@@ -88,19 +87,7 @@ void uv_close(uv_handle_t* handle, uv_close_cb cb) {
/* Handle-specific close actions */
switch (handle->type) {
case UV_TCP:
- tcp = (uv_tcp_t*)handle;
- /* If we don't shutdown before calling closesocket, windows will */
- /* silently discard the kernel send buffer and reset the connection. */
- if ((tcp->flags & UV_HANDLE_CONNECTION) &&
- !(tcp->flags & UV_HANDLE_SHUT)) {
- shutdown(tcp->socket, SD_SEND);
- tcp->flags |= UV_HANDLE_SHUTTING | UV_HANDLE_SHUT;
- }
- tcp->flags &= ~(UV_HANDLE_READING | UV_HANDLE_LISTENING);
- closesocket(tcp->socket);
- if (tcp->reqs_pending == 0) {
- uv_want_endgame(loop, handle);
- }
+ uv_tcp_close((uv_tcp_t*)handle);
return;
case UV_NAMED_PIPE:
View
12 src/win/internal.h
@@ -68,7 +68,7 @@ void uv_process_timers(uv_loop_t* loop);
#define UV_HANDLE_NON_OVERLAPPED_PIPE 0x00200000
#define UV_HANDLE_TTY_SAVED_POSITION 0x00400000
#define UV_HANDLE_TTY_SAVED_ATTRIBUTES 0x00800000
-#define UV_HANDLE_SHARED_TCP_SERVER 0x01000000
+#define UV_HANDLE_SHARED_TCP_SOCKET 0x01000000
#define UV_HANDLE_TCP_NODELAY 0x02000000
#define UV_HANDLE_TCP_KEEPALIVE 0x04000000
#define UV_HANDLE_TCP_SINGLE_ACCEPT 0x08000000
@@ -143,11 +143,15 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle,
void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle);
-int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info);
+int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info,
+ int tcp_connection);
int uv_tcp_duplicate_socket(uv_tcp_t* handle, int pid,
LPWSAPROTOCOL_INFOW protocol_info);
+void uv_tcp_process_close(uv_loop_t* loop, uv_tcp_t* tcp);
+void uv_tcp_close(uv_tcp_t* tcp);
+
/*
* UDP
@@ -337,6 +341,10 @@ int WSAAPI uv_wsarecvfrom_workaround(SOCKET socket, WSABUF* buffers,
/* Whether ipv6 is supported */
extern int uv_allow_ipv6;
+/* Whether there are any LSPs stacked on TCP */
+extern int uv_tcp_no_lsps_ipv4;
+extern int uv_tcp_no_lsps_ipv6;
+
/* Whether there are any non-IFS LSPs stacked on TCP */
extern int uv_tcp_non_ifs_lsp_ipv4;
extern int uv_tcp_non_ifs_lsp_ipv6;
View
59 src/win/pipe.c
@@ -42,8 +42,9 @@ static const int64_t eof_timeout = 50; /* ms */
static const int default_pending_pipe_instances = 4;
/* IPC protocol flags. */
-#define UV_IPC_RAW_DATA 0x0001
-#define UV_IPC_UV_STREAM 0x0002
+#define UV_IPC_RAW_DATA 0x0001
+#define UV_IPC_TCP_SERVER 0x0002
+#define UV_IPC_TCP_CONNECTION 0x0004
/* IPC frame header. */
typedef struct {
@@ -79,7 +80,8 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
handle->name = NULL;
handle->ipc_pid = 0;
handle->remaining_ipc_rawdata_bytes = 0;
- handle->pending_socket_info = NULL;
+ handle->pending_ipc_info.socket_info = NULL;
+ handle->pending_ipc_info.tcp_connection = 0;
handle->ipc = ipc;
handle->non_overlapped_writes_tail = NULL;
@@ -356,9 +358,9 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
handle->flags |= UV_HANDLE_CLOSED;
if (handle->flags & UV_HANDLE_CONNECTION) {
- if (handle->pending_socket_info) {
- free(handle->pending_socket_info);
- handle->pending_socket_info = NULL;
+ if (handle->pending_ipc_info.socket_info) {
+ free(handle->pending_ipc_info.socket_info);
+ handle->pending_ipc_info.socket_info = NULL;
}
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
@@ -711,13 +713,14 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
uv_pipe_accept_t* req;
if (server->ipc) {
- if (!server->pending_socket_info) {
+ if (!server->pending_ipc_info.socket_info) {
/* No valid pending sockets. */
uv__set_sys_error(loop, WSAEWOULDBLOCK);
return -1;
}
- return uv_tcp_import((uv_tcp_t*)client, server->pending_socket_info);
+ return uv_tcp_import((uv_tcp_t*)client, server->pending_ipc_info.socket_info,
+ server->pending_ipc_info.tcp_connection);
} else {
pipe_client = (uv_pipe_t*)client;
@@ -1051,9 +1054,8 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
return -1;
}
- /* Only TCP server handles are supported for sharing. */
- if (send_handle && (send_handle->type != UV_TCP ||
- send_handle->flags & UV_HANDLE_CONNECTION)) {
+ /* Only TCP handles are supported for sharing. */
+ if (send_handle && send_handle->type != UV_TCP) {
uv__set_artificial_error(loop, UV_ENOTSUP);
return -1;
}
@@ -1091,7 +1093,11 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
&ipc_frame.socket_info)) {
return -1;
}
- ipc_frame.header.flags |= UV_IPC_UV_STREAM;
+ ipc_frame.header.flags |= UV_IPC_TCP_SERVER;
+
+ if (tcp_send_handle->flags & UV_HANDLE_CONNECTION) {
+ ipc_frame.header.flags |= UV_IPC_TCP_CONNECTION;
+ }
}
if (bufcnt == 1) {
@@ -1132,7 +1138,7 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
result = WriteFile(handle->handle,
&ipc_frame,
- ipc_frame.header.flags & UV_IPC_UV_STREAM ?
+ ipc_frame.header.flags & UV_IPC_TCP_SERVER ?
sizeof(ipc_frame) : sizeof(ipc_frame.header),
NULL,
&ipc_header_req->overlapped);
@@ -1146,7 +1152,7 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
ipc_header_req->queued_bytes = 0;
} else {
/* Request queued by the kernel. */
- ipc_header_req->queued_bytes = ipc_frame.header.flags & UV_IPC_UV_STREAM ?
+ ipc_header_req->queued_bytes = ipc_frame.header.flags & UV_IPC_TCP_SERVER ?
sizeof(ipc_frame) : sizeof(ipc_frame.header);
handle->write_queue_size += req->queued_bytes;
}
@@ -1330,9 +1336,10 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
}
assert(bytes == sizeof(ipc_frame.header));
- assert(ipc_frame.header.flags <= (UV_IPC_UV_STREAM | UV_IPC_RAW_DATA));
+ assert(ipc_frame.header.flags <= (UV_IPC_TCP_SERVER | UV_IPC_RAW_DATA |
+ UV_IPC_TCP_CONNECTION));
- if (ipc_frame.header.flags & UV_IPC_UV_STREAM) {
+ if (ipc_frame.header.flags & UV_IPC_TCP_SERVER) {
assert(avail - sizeof(ipc_frame.header) >=
sizeof(ipc_frame.socket_info));
@@ -1350,14 +1357,16 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header));
/* Store the pending socket info. */
- assert(!handle->pending_socket_info);
- handle->pending_socket_info =
- (WSAPROTOCOL_INFOW*)malloc(sizeof(*(handle->pending_socket_info)));
- if (!handle->pending_socket_info) {
+ assert(!handle->pending_ipc_info.socket_info);
+ handle->pending_ipc_info.socket_info =
+ (WSAPROTOCOL_INFOW*)malloc(sizeof(*(handle->pending_ipc_info.socket_info)));
+ if (!handle->pending_ipc_info.socket_info) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
}
- *(handle->pending_socket_info) = ipc_frame.socket_info;
+ *(handle->pending_ipc_info.socket_info) = ipc_frame.socket_info;
+ handle->pending_ipc_info.tcp_connection =
+ ipc_frame.header.flags & UV_IPC_TCP_CONNECTION;
}
if (ipc_frame.header.flags & UV_IPC_RAW_DATA) {
@@ -1385,14 +1394,14 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
handle->remaining_ipc_rawdata_bytes - bytes;
if (handle->read2_cb) {
handle->read2_cb(handle, bytes, buf,
- handle->pending_socket_info ? UV_TCP : UV_UNKNOWN_HANDLE);
+ handle->pending_ipc_info.socket_info ? UV_TCP : UV_UNKNOWN_HANDLE);
} else if (handle->read_cb) {
handle->read_cb((uv_stream_t*)handle, bytes, buf);
}
- if (handle->pending_socket_info) {
- free(handle->pending_socket_info);
- handle->pending_socket_info = NULL;
+ if (handle->pending_ipc_info.socket_info) {
+ free(handle->pending_ipc_info.socket_info);
+ handle->pending_ipc_info.socket_info = NULL;
}
} else {
handle->read_cb((uv_stream_t*)handle, bytes, buf);
View
4 src/win/req.c
@@ -167,6 +167,10 @@ void uv_process_reqs(uv_loop_t* loop) {
uv_process_fs_event_req(loop, req, (uv_fs_event_t*) req->data);
break;
+ case UV_TCP_CLOSE:
+ uv_tcp_process_close(loop, (uv_tcp_t*) req->data);
+ break;
+
default:
assert(0);
}
View
215 src/win/tcp.c
@@ -218,6 +218,18 @@ void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle) {
handle->accept_reqs = NULL;
}
+ if (handle->flags & UV_HANDLE_CONNECTION &&
+ handle->flags & UV_HANDLE_EMULATE_IOCP) {
+ if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
+ UnregisterWait(handle->read_req.wait_handle);
+ handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
+ }
+ if (handle->read_req.event_handle) {
+ CloseHandle(handle->read_req.event_handle);
+ handle->read_req.event_handle = NULL;
+ }
+ }
+
if (handle->close_cb) {
handle->close_cb((uv_handle_t*)handle);
}
@@ -415,6 +427,13 @@ static void uv_tcp_queue_read(uv_loop_t* loop, uv_tcp_t* handle) {
buf.len = 0;
}
+ /* Prepare the overlapped structure. */
+ memset(&(req->overlapped), 0, sizeof(req->overlapped));
+ if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
+ assert(req->event_handle);
+ req->overlapped.hEvent = (HANDLE) ((DWORD) req->event_handle | 1);
+ }
+
flags = 0;
result = WSARecv(handle->socket,
(WSABUF*)&buf,
@@ -434,6 +453,14 @@ static void uv_tcp_queue_read(uv_loop_t* loop, uv_tcp_t* handle) {
/* The req will be processed with IOCP. */
handle->flags |= UV_HANDLE_READ_PENDING;
handle->reqs_pending++;
+ if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
+ req->wait_handle == INVALID_HANDLE_VALUE &&
+ !RegisterWaitForSingleObject(&req->wait_handle,
+ req->overlapped.hEvent, post_completion, (void*) req,
+ INFINITE, WT_EXECUTEINWAITTHREAD)) {
+ SET_REQ_ERROR(req, GetLastError());
+ uv_insert_pending_req(loop, (uv_req_t*)req);
+ }
} else {
/* Make this req pending reporting an error. */
SET_REQ_ERROR(req, WSAGetLastError());
@@ -466,7 +493,7 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
}
}
- if (!(handle->flags & UV_HANDLE_SHARED_TCP_SERVER) &&
+ if (!(handle->flags & UV_HANDLE_SHARED_TCP_SOCKET) &&
listen(handle->socket, backlog) == SOCKET_ERROR) {
uv__set_sys_error(loop, WSAGetLastError());
return -1;
@@ -593,10 +620,18 @@ int uv_tcp_read_start(uv_tcp_t* handle, uv_alloc_cb alloc_cb,
handle->read_cb = read_cb;
handle->alloc_cb = alloc_cb;
- /* If reading was stopped and then started again, there could stell be a */
+ /* If reading was stopped and then started again, there could still be a */
/* read request pending. */
- if (!(handle->flags & UV_HANDLE_READ_PENDING))
+ if (!(handle->flags & UV_HANDLE_READ_PENDING)) {
+ if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
+ !handle->read_req.event_handle) {
+ handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL);
+ if (!handle->read_req.event_handle) {
+ uv_fatal_error(GetLastError(), "CreateEvent");
+ }
+ }
uv_tcp_queue_read(loop, handle);
+ }
return 0;
}
@@ -790,6 +825,16 @@ int uv_tcp_write(uv_loop_t* loop, uv_write_t* req, uv_tcp_t* handle,
req->cb = cb;
memset(&req->overlapped, 0, sizeof(req->overlapped));
+ /* Prepare the overlapped structure. */
+ memset(&(req->overlapped), 0, sizeof(req->overlapped));
+ if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
+ req->event_handle = CreateEvent(NULL, 0, 0, NULL);
+ if (!req->event_handle) {
+ uv_fatal_error(GetLastError(), "CreateEvent");
+ }
+ req->overlapped.hEvent = (HANDLE) ((DWORD) req->event_handle | 1);
+ }
+
result = WSASend(handle->socket,
(WSABUF*)bufs,
bufcnt,
@@ -812,6 +857,14 @@ int uv_tcp_write(uv_loop_t* loop, uv_write_t* req, uv_tcp_t* handle,
handle->write_reqs_pending++;
handle->write_queue_size += req->queued_bytes;
uv_ref(loop);
+ if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
+ req->wait_handle == INVALID_HANDLE_VALUE &&
+ !RegisterWaitForSingleObject(&req->wait_handle,
+ req->overlapped.hEvent, post_completion, (void*) req,
+ INFINITE, WT_EXECUTEINWAITTHREAD)) {
+ SET_REQ_ERROR(req, GetLastError());
+ uv_insert_pending_req(loop, (uv_req_t*)req);
+ }
} else {
/* Send failed due to an error. */
uv__set_sys_error(loop, WSAGetLastError());
@@ -945,6 +998,17 @@ void uv_process_tcp_write_req(uv_loop_t* loop, uv_tcp_t* handle,
assert(handle->write_queue_size >= req->queued_bytes);
handle->write_queue_size -= req->queued_bytes;
+ if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
+ if (req->wait_handle != INVALID_HANDLE_VALUE) {
+ UnregisterWait(req->wait_handle);
+ req->wait_handle = INVALID_HANDLE_VALUE;
+ }
+ if (req->event_handle) {
+ CloseHandle(req->event_handle);
+ req->event_handle = NULL;
+ }
+ }
+
if (req->cb) {
uv__set_sys_error(loop, GET_REQ_SOCK_ERROR(req));
((uv_write_cb)req->cb)(req, loop->last_err.code == UV_OK ? 0 : -1);
@@ -1036,7 +1100,8 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle,
}
-int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info) {
+int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info,
+ int tcp_connection) {
SOCKET socket = WSASocketW(AF_INET,
SOCK_STREAM,
IPPROTO_IP,
@@ -1050,13 +1115,22 @@ int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info) {
}
tcp->flags |= UV_HANDLE_BOUND;
- tcp->flags |= UV_HANDLE_SHARED_TCP_SERVER;
+ tcp->flags |= UV_HANDLE_SHARED_TCP_SOCKET;
+
+ if (tcp_connection) {
+ uv_connection_init((uv_stream_t*)tcp);
+ }
if (socket_protocol_info->iAddressFamily == AF_INET6) {
tcp->flags |= UV_HANDLE_IPV6;
}
- return uv_tcp_set_socket(tcp->loop, tcp, socket, 1);
+ if (uv_tcp_set_socket(tcp->loop, tcp, socket, 1) != 0) {
+ return -1;
+ }
+
+ tcp->loop->active_tcp_streams++;
+ return 0;
}
@@ -1097,28 +1171,26 @@ int uv_tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay) {
int uv_tcp_duplicate_socket(uv_tcp_t* handle, int pid,
LPWSAPROTOCOL_INFOW protocol_info) {
- assert(!(handle->flags & UV_HANDLE_CONNECTION));
-
- /*
- * We're about to share the socket with another process. Because
- * this is a listening socket, we assume that the other process will
- * be accepting connections on it. So, before sharing the socket
- * with another process, we call listen here in the parent process.
- * This needs to be modified if the socket is shared with
- * another process for anything other than accepting connections.
- */
-
- if (!(handle->flags & UV_HANDLE_LISTENING)) {
- if (!(handle->flags & UV_HANDLE_BOUND)) {
- uv__set_artificial_error(handle->loop, UV_EINVAL);
- return -1;
- }
- if (listen(handle->socket, SOMAXCONN) == SOCKET_ERROR) {
- uv__set_sys_error(handle->loop, WSAGetLastError());
- return -1;
+ if (!(handle->flags & UV_HANDLE_CONNECTION)) {
+ /*
+ * We're about to share the socket with another process. Because
+ * this is a listening socket, we assume that the other process will
+ * be accepting connections on it. So, before sharing the socket
+ * with another process, we call listen here in the parent process.
+ * This needs to be modified if the socket is shared with
+ * another process for anything other than accepting connections.
+ */
+
+ if (!(handle->flags & UV_HANDLE_LISTENING)) {
+ if (!(handle->flags & UV_HANDLE_BOUND)) {
+ uv__set_artificial_error(handle->loop, UV_EINVAL);
+ return -1;
+ }
+ if (listen(handle->socket, SOMAXCONN) == SOCKET_ERROR) {
+ uv__set_sys_error(handle->loop, WSAGetLastError());
+ return -1;
+ }
}
-
- handle->flags |= UV_HANDLE_SHARED_TCP_SERVER;
}
if (WSADuplicateSocketW(handle->socket, pid, protocol_info)) {
@@ -1126,6 +1198,8 @@ int uv_tcp_duplicate_socket(uv_tcp_t* handle, int pid,
return -1;
}
+ handle->flags |= UV_HANDLE_SHARED_TCP_SOCKET;
+
return 0;
}
@@ -1161,4 +1235,89 @@ int uv_tcp_simultaneous_accepts(uv_tcp_t* handle, int enable) {
}
return 0;
-}
+}
+
+
+static DWORD WINAPI tcp_close_thread_proc(void* parameter) {
+ int errno;
+ DWORD no = 0;
+ struct linger l = {1, 1};
+ uv_tcp_t* tcp = (uv_tcp_t*)parameter;
+
+ /* Set linger on */
+ setsockopt(tcp->socket, SOL_SOCKET, SO_LINGER, (const char*)&l, sizeof l);
+
+ /* Change to blocking mode */
+ ioctlsocket(tcp->socket, FIONBIO, &no);
+
+ /*
+ * Close the socket. This will block until the graceful close is done
+ * or until 1 sec timeout expires.
+ */
+ closesocket(tcp->socket);
+
+ POST_COMPLETION_FOR_REQ(tcp->loop, &tcp->close_req);
+ return 0;
+}
+
+
+void uv_tcp_process_close(uv_loop_t* loop, uv_tcp_t* tcp) {
+ tcp->reqs_pending--;
+ if (tcp->reqs_pending == 0) {
+ uv_want_endgame(tcp->loop, (uv_handle_t*)tcp);
+ }
+}
+
+
+void uv_tcp_close(uv_tcp_t* tcp) {
+ int no_lsps;
+
+ /*
+ * In order for winsock to do a graceful close there must not be
+ * any pending reads.
+ */
+ if (tcp->flags & UV_HANDLE_READ_PENDING) {
+ /* Check if we have any LSPs stacked on top of TCP */
+ no_lsps = (tcp->flags & UV_HANDLE_IPV6) ? uv_tcp_no_lsps_ipv6 :
+ uv_tcp_no_lsps_ipv4;
+ if (no_lsps) {
+ /* No LSPs, safe to cancel the pending read with CancelIo */
+ CancelIo((HANDLE)tcp->socket);
+ } else if (!(tcp->flags & UV_HANDLE_SHARED_TCP_SOCKET)) {
+ /*
+ * shutdown ensures that the socket is gracefully closed
+ * (with pending reads). It's safe to do shutdown here because
+ * we're closing non-shared connection socket.
+ */
+ shutdown(tcp->socket, SD_SEND);
+ tcp->flags |= UV_HANDLE_SHUT;
+ } else {
+ /*
+ * Shared connection socket with pending read. We can't call shutdown
+ * because the connection might still be used from another process.
+ * Instead we turn linger on and call blocking closesocket
+ * on a thread.
+ */
+ tcp->flags &= ~(UV_HANDLE_READING | UV_HANDLE_LISTENING);
+
+ uv_req_init(tcp->loop, (uv_req_t*)&tcp->close_req);
+ tcp->close_req.type = UV_TCP_CLOSE;
+ tcp->close_req.data = tcp;
+
+ if (!QueueUserWorkItem(tcp_close_thread_proc,
+ tcp,
+ WT_EXECUTELONGFUNCTION)) {
+ uv_fatal_error(GetLastError(), "QueueUserWorkItem");
+ }
+
+ tcp->reqs_pending++;
+ return;
+ }
+ }
+
+ tcp->flags &= ~(UV_HANDLE_READING | UV_HANDLE_LISTENING);
+ closesocket(tcp->socket);
+ if (tcp->reqs_pending == 0) {
+ uv_want_endgame(tcp->loop, (uv_handle_t*)tcp);
+ }
+}
View
12 src/win/winsock.c
@@ -28,6 +28,10 @@
/* Whether ipv6 is supported */
int uv_allow_ipv6;
+/* Whether there are any LSPs stacked on TCP */
+int uv_tcp_no_lsps_ipv4;
+int uv_tcp_no_lsps_ipv6;
+
/* Whether there are any non-IFS LSPs stacked on TCP */
int uv_tcp_non_ifs_lsp_ipv4;
int uv_tcp_non_ifs_lsp_ipv6;
@@ -117,6 +121,10 @@ void uv_winsock_init() {
uv_tcp_non_ifs_lsp_ipv4 = 1;
}
+ if (protocol_info.ProtocolChain.ChainLen == 1) {
+ uv_tcp_no_lsps_ipv4 = 1;
+ }
+
if (closesocket(dummy) == SOCKET_ERROR) {
uv_fatal_error(WSAGetLastError(), "closesocket");
}
@@ -139,6 +147,10 @@ void uv_winsock_init() {
uv_tcp_non_ifs_lsp_ipv6 = 1;
}
+ if (protocol_info.ProtocolChain.ChainLen == 1) {
+ uv_tcp_no_lsps_ipv6 = 1;
+ }
+
if (closesocket(dummy) == SOCKET_ERROR) {
uv_fatal_error(WSAGetLastError(), "closesocket");
}
View
171 test/run-tests.c
@@ -61,29 +61,85 @@ static uv_pipe_t stdin_pipe;
static uv_pipe_t stdout_pipe;
static int on_pipe_read_called;
static int after_write_called;
+static int tcp_conn_read_cb_called;
+static int tcp_conn_write_cb_called;
+struct {
+ uv_connect_t conn_req;
+ uv_write_t tcp_write_req;
+ uv_tcp_t conn;
+} tcp_conn;
static void close_cb(uv_handle_t* handle) {
close_cb_called++;
}
-static void close_conn_cb(uv_handle_t* handle) {
- free(handle);
- close_cb_called++;
+void conn_notify_write_cb(uv_write_t* req, int status) {
+ uv_close((uv_handle_t*)&tcp_server, close_cb);
+ uv_close((uv_handle_t*)&channel, close_cb);
}
-void conn_notify_write_cb(uv_write_t* req, int status) {
- uv_close((uv_handle_t*)&tcp_server, close_cb);
+void tcp_connection_write_cb(uv_write_t* req, int status) {
+ ASSERT((uv_handle_t*)&tcp_conn.conn == (uv_handle_t*)req->handle);
+ uv_close((uv_handle_t*)req->handle, close_cb);
uv_close((uv_handle_t*)&channel, close_cb);
+ uv_close((uv_handle_t*)&tcp_server, close_cb);
+ tcp_conn_write_cb_called++;
+}
+
+
+void on_tcp_read(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
+ uv_buf_t outbuf;
+ int r;
+
+ if (nread < 0) {
+ if (uv_last_error(tcp->loop).code == UV_EOF) {
+ free(buf.base);
+ return;
+ }
+
+ printf("error recving on tcp connection: %s\n",
+ uv_strerror(uv_last_error(tcp->loop)));
+ abort();
+ }
+
+ ASSERT(nread > 0);
+ ASSERT(memcmp("world\n", buf.base, nread) == 0);
+ on_pipe_read_called++;
+ free(buf.base);
+
+ /* Write to the socket */
+ outbuf = uv_buf_init("hello again\n", 12);
+ r = uv_write(&tcp_conn.tcp_write_req, tcp, &outbuf, 1, tcp_connection_write_cb);
+ ASSERT(r == 0);
+
+ tcp_conn_read_cb_called++;
+}
+
+
+static uv_buf_t on_read_alloc(uv_handle_t* handle,
+ size_t suggested_size) {
+ uv_buf_t buf;
+ buf.base = (char*)malloc(suggested_size);
+ buf.len = suggested_size;
+ return buf;
+}
+
+
+static void connect_cb(uv_connect_t* req, int status) {
+ int r;
+
+ ASSERT(status == 0);
+ r = uv_read_start(req->handle, on_read_alloc, on_tcp_read);
+ ASSERT(r == 0);
}
static void ipc_on_connection(uv_stream_t* server, int status) {
int r;
uv_buf_t buf;
- uv_tcp_t* conn;
if (!connection_accepted) {
/*
@@ -93,16 +149,13 @@ static void ipc_on_connection(uv_stream_t* server, int status) {
ASSERT(status == 0);
ASSERT((uv_stream_t*)&tcp_server == server);
- conn = malloc(sizeof(*conn));
- ASSERT(conn);
-
- r = uv_tcp_init(server->loop, conn);
+ r = uv_tcp_init(server->loop, &tcp_conn.conn);
ASSERT(r == 0);
- r = uv_accept(server, (uv_stream_t*)conn);
+ r = uv_accept(server, (uv_stream_t*)&tcp_conn.conn);
ASSERT(r == 0);
- uv_close((uv_handle_t*)conn, close_conn_cb);
+ uv_close((uv_handle_t*)&tcp_conn.conn, close_cb);
buf = uv_buf_init("accepted_connection\n", 20);
r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
@@ -114,12 +167,40 @@ static void ipc_on_connection(uv_stream_t* server, int status) {
}
+static void ipc_on_connection_tcp_conn(uv_stream_t* server, int status) {
+ int r;
+ uv_buf_t buf;
+ uv_tcp_t* conn;
+
+ ASSERT(status == 0);
+ ASSERT((uv_stream_t*)&tcp_server == server);
+
+ conn = malloc(sizeof(*conn));
+ ASSERT(conn);
+
+ r = uv_tcp_init(server->loop, conn);
+ ASSERT(r == 0);
+
+ r = uv_accept(server, (uv_stream_t*)conn);
+ ASSERT(r == 0);
+
+ /* Send the accepted connection to the other process */
+ buf = uv_buf_init("hello\n", 6);
+ r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
+ (uv_stream_t*)conn, NULL);
+ ASSERT(r == 0);
+
+ r = uv_read_start((uv_stream_t*)conn, on_read_alloc, on_tcp_read);
+ ASSERT(r == 0);
+
+ uv_close((uv_handle_t*)conn, close_cb);
+}
+
+
static int ipc_helper(int listen_after_write) {
/*
* This is launched from test-ipc.c. stdin is a duplex channel that we
- * over which a handle will be transmitted. In this initial version only
- * data is transfered over the channel. XXX edit this comment after handle
- * transfer is added.
+ * over which a handle will be transmitted.
*/
uv_write_t write_req;
@@ -165,6 +246,51 @@ static int ipc_helper(int listen_after_write) {
}
+static int ipc_helper_tcp_connection() {
+ /*
+ * This is launched from test-ipc.c. stdin is a duplex channel that we
+ * over which a handle will be transmitted.
+ */
+
+ int r;
+ struct sockaddr_in addr;
+
+ r = uv_pipe_init(uv_default_loop(), &channel, 1);
+ ASSERT(r == 0);
+
+ uv_pipe_open(&channel, 0);
+
+ ASSERT(uv_is_readable((uv_stream_t*)&channel));
+ ASSERT(uv_is_writable((uv_stream_t*)&channel));
+
+ r = uv_tcp_init(uv_default_loop(), &tcp_server);
+ ASSERT(r == 0);
+
+ r = uv_tcp_bind(&tcp_server, uv_ip4_addr("0.0.0.0", TEST_PORT));
+ ASSERT(r == 0);
+
+ r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection_tcp_conn);
+ ASSERT(r == 0);
+
+ /* Make a connection to the server */
+ r = uv_tcp_init(uv_default_loop(), &tcp_conn.conn);
+ ASSERT(r == 0);
+
+ addr = uv_ip4_addr("127.0.0.1", TEST_PORT);
+ r = uv_tcp_connect(&tcp_conn.conn_req, (uv_tcp_t*)&tcp_conn.conn, addr, connect_cb);
+ ASSERT(r == 0);
+
+ r = uv_run(uv_default_loop());
+ ASSERT(r == 0);
+
+ ASSERT(tcp_conn_read_cb_called == 1);
+ ASSERT(tcp_conn_write_cb_called == 1);
+ ASSERT(close_cb_called == 4);
+
+ return 0;
+}
+
+
void on_pipe_read(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
ASSERT(nread > 0);
ASSERT(memcmp("hello world\n", buf.base, nread) == 0);
@@ -177,15 +303,6 @@ void on_pipe_read(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
}
-static uv_buf_t on_pipe_read_alloc(uv_handle_t* handle,
- size_t suggested_size) {
- uv_buf_t buf;
- buf.base = (char*)malloc(suggested_size);
- buf.len = suggested_size;
- return buf;
-}
-
-
static void after_pipe_write(uv_write_t* req, int status) {
ASSERT(status == 0);
after_write_called++;
@@ -243,7 +360,7 @@ static int stdio_over_pipes_helper() {
uv_ref(loop);
uv_ref(loop);
- r = uv_read_start((uv_stream_t*)&stdin_pipe, on_pipe_read_alloc,
+ r = uv_read_start((uv_stream_t*)&stdin_pipe, on_read_alloc,
on_pipe_read);
ASSERT(r == 0);
@@ -276,6 +393,10 @@ static int maybe_run_test(int argc, char **argv) {
return ipc_send_recv_helper();
}
+ if (strcmp(argv[1], "ipc_helper_tcp_connection") == 0) {
+ return ipc_helper_tcp_connection();
+ }
+
if (strcmp(argv[1], "stdio_over_pipes_helper") == 0) {
return stdio_over_pipes_helper();
}
View
114 test/test-ipc.c
@@ -27,9 +27,12 @@
static uv_pipe_t channel;
static uv_tcp_t tcp_server;
+static uv_tcp_t tcp_connection;
static int exit_cb_called;
static int read2_cb_called;
+static int tcp_write_cb_called;
+static int tcp_read_cb_called;
static int local_conn_accepted;
static int remote_conn_accepted;
static int tcp_server_listening;
@@ -214,32 +217,127 @@ void spawn_helper(uv_pipe_t* channel,
}
-static int run_ipc_test(const char* helper) {
+static void on_tcp_write(uv_write_t* req, int status) {
+ ASSERT(status == 0);
+ ASSERT(req->handle == (uv_stream_t*)&tcp_connection);
+ tcp_write_cb_called++;
+}
+
+
+static uv_buf_t on_read_alloc(uv_handle_t* handle, size_t suggested_size) {
+ uv_buf_t buf;
+ buf.base = (char*)malloc(suggested_size);
+ buf.len = suggested_size;
+ return buf;
+}
+
+
+static void on_tcp_read(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
+ ASSERT(nread > 0);
+ ASSERT(memcmp("hello again\n", buf.base, nread) == 0);
+ ASSERT(tcp == (uv_stream_t*)&tcp_connection);
+ free(buf.base);
+
+ tcp_read_cb_called++;
+
+ uv_close((uv_handle_t*)tcp, NULL);
+ uv_close((uv_handle_t*)&channel, NULL);
+}
+
+
+static void on_read_connection(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf,
+ uv_handle_type pending) {
+ int r;
+ uv_buf_t outbuf;
+ uv_err_t err;
+
+ if (nread == 0) {
+ /* Everything OK, but nothing read. */
+ free(buf.base);
+ return;
+ }
+
+ if (nread < 0) {
+ err = uv_last_error(pipe->loop);
+ if (err.code == UV_EOF) {
+ free(buf.base);
+ return;
+ }
+
+ printf("error recving on channel: %s\n", uv_strerror(err));
+ abort();
+ }
+
+ fprintf(stderr, "got %d bytes\n", (int)nread);
+
+ ASSERT(nread > 0 && buf.base && pending != UV_UNKNOWN_HANDLE);
+ read2_cb_called++;
+
+ /* Accept the pending TCP connection */
+ ASSERT(pending == UV_TCP);
+ r = uv_tcp_init(uv_default_loop(), &tcp_connection);
+ ASSERT(r == 0);
+
+ r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_connection);
+ ASSERT(r == 0);
+
+ /* Make sure that the expected data is correctly multiplexed. */
+ ASSERT(memcmp("hello\n", buf.base, nread) == 0);
+
+ /* Write/read to/from the connection */
+ outbuf = uv_buf_init("world\n", 6);
+ r = uv_write(&write_req, (uv_stream_t*)&tcp_connection, &outbuf, 1,
+ on_tcp_write);
+ ASSERT(r == 0);
+
+ r = uv_read_start((uv_stream_t*)&tcp_connection, on_read_alloc, on_tcp_read);
+ ASSERT(r == 0);
+
+ free(buf.base);
+}
+
+
+static int run_ipc_test(const char* helper, uv_read2_cb read_cb) {
uv_process_t process;
int r;
spawn_helper(&channel, &process, helper);
- uv_read2_start((uv_stream_t*)&channel, on_alloc, on_read);
+ uv_read2_start((uv_stream_t*)&channel, on_alloc, read_cb);
r = uv_run(uv_default_loop());
ASSERT(r == 0);
+ return 0;
+}
+
+
+TEST_IMPL(ipc_listen_before_write) {
+ int r = run_ipc_test("ipc_helper_listen_before_write", on_read);
ASSERT(local_conn_accepted == 1);
ASSERT(remote_conn_accepted == 1);
ASSERT(read2_cb_called == 1);
ASSERT(exit_cb_called == 1);
-
- return 0;
+ return r;
}
-TEST_IMPL(ipc_listen_before_write) {
- return run_ipc_test("ipc_helper_listen_before_write");
+TEST_IMPL(ipc_listen_after_write) {
+ int r = run_ipc_test("ipc_helper_listen_after_write", on_read);
+ ASSERT(local_conn_accepted == 1);
+ ASSERT(remote_conn_accepted == 1);
+ ASSERT(read2_cb_called == 1);
+ ASSERT(exit_cb_called == 1);
+ return r;
}
-TEST_IMPL(ipc_listen_after_write) {
- return run_ipc_test("ipc_helper_listen_after_write");
+TEST_IMPL(ipc_tcp_connection) {
+ int r = run_ipc_test("ipc_helper_tcp_connection", on_read_connection);
+ ASSERT(read2_cb_called == 1);
+ ASSERT(tcp_write_cb_called == 1);
+ ASSERT(tcp_read_cb_called == 1);
+ ASSERT(exit_cb_called == 1);
+ return r;
}
View
2  test/test-list.h
@@ -26,6 +26,7 @@ TEST_DECLARE (ipc_listen_before_write)
TEST_DECLARE (ipc_listen_after_write)
TEST_DECLARE (ipc_send_recv_pipe)
TEST_DECLARE (ipc_send_recv_tcp)
+TEST_DECLARE (ipc_tcp_connection)
TEST_DECLARE (tcp_ping_pong)
TEST_DECLARE (tcp_ping_pong_v6)
TEST_DECLARE (pipe_ping_pong)
@@ -180,6 +181,7 @@ TASK_LIST_START
TEST_ENTRY (ipc_listen_after_write)
TEST_ENTRY (ipc_send_recv_pipe)
TEST_ENTRY (ipc_send_recv_tcp)
+ TEST_ENTRY (ipc_tcp_connection)
TEST_ENTRY (tcp_ping_pong)
TEST_HELPER (tcp_ping_pong, tcp4_echo_server)
Please sign in to comment.
Something went wrong with that request. Please try again.