Skip to content
Browse files

remove linger

  • Loading branch information...
1 parent 74a773d commit 41ddc95d9602da7e1fd5df131013d0be96f92c0a Igor Zinkovsky committed
Showing with 360 additions and 425 deletions.
  1. +2 −6 include/uv-private/uv-win.h
  2. +1 −7 src/win/internal.h
  3. +0 −4 src/win/req.c
  4. +25 −69 src/win/tcp.c
  5. +0 −12 src/win/winsock.c
  6. +5 −324 test/run-tests.c
  7. +228 −3 test/test-ipc.c
  8. +99 −0 test/test-stdio-over-pipes.c
View
8 include/uv-private/uv-win.h
@@ -214,8 +214,7 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
UV_PROCESS_EXIT, \
UV_PROCESS_CLOSE, \
UV_UDP_RECV, \
- UV_FS_EVENT_REQ, \
- UV_TCP_CLOSE
+ UV_FS_EVENT_REQ
#define UV_REQ_PRIVATE_FIELDS \
union { \
@@ -287,10 +286,7 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
#define uv_tcp_connection_fields \
uv_buf_t read_buffer; \
- LPFN_CONNECTEX func_connectex; \
- struct uv_tcp_close_s { \
- UV_REQ_FIELDS \
- } close_req;
+ LPFN_CONNECTEX func_connectex;
#define UV_TCP_PRIVATE_FIELDS \
SOCKET socket; \
View
8 src/win/internal.h
@@ -73,6 +73,7 @@ void uv_process_timers(uv_loop_t* loop);
#define UV_HANDLE_TCP_KEEPALIVE 0x04000000
#define UV_HANDLE_TCP_SINGLE_ACCEPT 0x08000000
#define UV_HANDLE_TCP_ACCEPT_STATE_CHANGING 0x10000000
+#define UV_HANDLE_TCP_SOCKET_CLOSED 0x20000000
void uv_want_endgame(uv_loop_t* loop, uv_handle_t* handle);
void uv_process_endgames(uv_loop_t* loop);
@@ -149,9 +150,6 @@ int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info,
int uv_tcp_duplicate_socket(uv_tcp_t* handle, int pid,
LPWSAPROTOCOL_INFOW protocol_info);
-void uv_tcp_process_close(uv_loop_t* loop, uv_tcp_t* tcp);
-void uv_tcp_close(uv_tcp_t* tcp);
-
/*
* UDP
@@ -341,10 +339,6 @@ int WSAAPI uv_wsarecvfrom_workaround(SOCKET socket, WSABUF* buffers,
/* Whether ipv6 is supported */
extern int uv_allow_ipv6;
-/* Whether there are any LSPs stacked on TCP */
-extern int uv_tcp_no_lsps_ipv4;
-extern int uv_tcp_no_lsps_ipv6;
-
/* Whether there are any non-IFS LSPs stacked on TCP */
extern int uv_tcp_non_ifs_lsp_ipv4;
extern int uv_tcp_non_ifs_lsp_ipv6;
View
4 src/win/req.c
@@ -167,10 +167,6 @@ void uv_process_reqs(uv_loop_t* loop) {
uv_process_fs_event_req(loop, req, (uv_fs_event_t*) req->data);
break;
- case UV_TCP_CLOSE:
- uv_tcp_process_close(loop, (uv_tcp_t*) req->data);
- break;
-
default:
assert(0);
}
View
94 src/win/tcp.c
@@ -199,6 +199,11 @@ void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle) {
assert(!(handle->flags & UV_HANDLE_CLOSED));
handle->flags |= UV_HANDLE_CLOSED;
+ if (!(handle->flags & UV_HANDLE_TCP_SOCKET_CLOSED)) {
+ closesocket(handle->socket);
+ handle->flags |= UV_HANDLE_TCP_SOCKET_CLOSED;
+ }
+
if (!(handle->flags & UV_HANDLE_CONNECTION) && handle->accept_reqs) {
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
for (i = 0; i < uv_simultaneous_server_accepts; i++) {
@@ -353,7 +358,7 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
/* Prepare the overlapped structure. */
memset(&(req->overlapped), 0, sizeof(req->overlapped));
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
- req->overlapped.hEvent = (HANDLE) ((DWORD) req->event_handle | 1);
+ req->overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
}
success = handle->func_acceptex(handle->socket,
@@ -431,7 +436,7 @@ static void uv_tcp_queue_read(uv_loop_t* loop, uv_tcp_t* handle) {
memset(&(req->overlapped), 0, sizeof(req->overlapped));
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
assert(req->event_handle);
- req->overlapped.hEvent = (HANDLE) ((DWORD) req->event_handle | 1);
+ req->overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
}
flags = 0;
@@ -832,7 +837,7 @@ int uv_tcp_write(uv_loop_t* loop, uv_write_t* req, uv_tcp_t* handle,
if (!req->event_handle) {
uv_fatal_error(GetLastError(), "CreateEvent");
}
- req->overlapped.hEvent = (HANDLE) ((DWORD) req->event_handle | 1);
+ req->overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
}
result = WSASend(handle->socket,
@@ -1177,8 +1182,6 @@ int uv_tcp_duplicate_socket(uv_tcp_t* handle, int pid,
* this is a listening socket, we assume that the other process will
* be accepting connections on it. So, before sharing the socket
* with another process, we call listen here in the parent process.
- * This needs to be modified if the socket is shared with
- * another process for anything other than accepting connections.
*/
if (!(handle->flags & UV_HANDLE_LISTENING)) {
@@ -1238,85 +1241,38 @@ int uv_tcp_simultaneous_accepts(uv_tcp_t* handle, int enable) {
}
-static DWORD WINAPI tcp_close_thread_proc(void* parameter) {
- int errno;
- DWORD no = 0;
- struct linger l = {1, 1};
- uv_tcp_t* tcp = (uv_tcp_t*)parameter;
-
- /* Set linger on */
- setsockopt(tcp->socket, SOL_SOCKET, SO_LINGER, (const char*)&l, sizeof l);
-
- /* Change to blocking mode */
- ioctlsocket(tcp->socket, FIONBIO, &no);
-
- /*
- * Close the socket. This will block until the graceful close is done
- * or until 1 sec timeout expires.
- */
- closesocket(tcp->socket);
-
- POST_COMPLETION_FOR_REQ(tcp->loop, &tcp->close_req);
- return 0;
-}
-
-
-void uv_tcp_process_close(uv_loop_t* loop, uv_tcp_t* tcp) {
- tcp->reqs_pending--;
- if (tcp->reqs_pending == 0) {
- uv_want_endgame(tcp->loop, (uv_handle_t*)tcp);
- }
-}
-
-
void uv_tcp_close(uv_tcp_t* tcp) {
- int no_lsps;
+ int non_ifs_lsp;
+ int close_socket = 1;
/*
* In order for winsock to do a graceful close there must not be
* any pending reads.
*/
if (tcp->flags & UV_HANDLE_READ_PENDING) {
- /* Check if we have any LSPs stacked on top of TCP */
- no_lsps = (tcp->flags & UV_HANDLE_IPV6) ? uv_tcp_no_lsps_ipv6 :
- uv_tcp_no_lsps_ipv4;
- if (no_lsps) {
- /* No LSPs, safe to cancel the pending read with CancelIo */
- CancelIo((HANDLE)tcp->socket);
- } else if (!(tcp->flags & UV_HANDLE_SHARED_TCP_SOCKET)) {
+ /* Check if we have any non-IFS LSPs stacked on top of TCP */
+ non_ifs_lsp = (tcp->flags & UV_HANDLE_IPV6) ? uv_tcp_non_ifs_lsp_ipv6 :
+ uv_tcp_non_ifs_lsp_ipv4;
+ if (!non_ifs_lsp && (tcp->flags & UV_HANDLE_SHARED_TCP_SOCKET)) {
/*
- * shutdown ensures that the socket is gracefully closed
- * (with pending reads). It's safe to do shutdown here because
- * we're closing non-shared connection socket.
+ * Shared socket with no non-IFS LSPs, request to cancel pending I/O.
+ * The socket will be closed inside endgame.
*/
+ CancelIo((HANDLE)tcp->socket);
+ close_socket = 0;
+ } else {
shutdown(tcp->socket, SD_SEND);
tcp->flags |= UV_HANDLE_SHUT;
- } else {
- /*
- * Shared connection socket with pending read. We can't call shutdown
- * because the connection might still be used from another process.
- * Instead we turn linger on and call blocking closesocket
- * on a thread.
- */
- tcp->flags &= ~(UV_HANDLE_READING | UV_HANDLE_LISTENING);
-
- uv_req_init(tcp->loop, (uv_req_t*)&tcp->close_req);
- tcp->close_req.type = UV_TCP_CLOSE;
- tcp->close_req.data = tcp;
-
- if (!QueueUserWorkItem(tcp_close_thread_proc,
- tcp,
- WT_EXECUTELONGFUNCTION)) {
- uv_fatal_error(GetLastError(), "QueueUserWorkItem");
- }
-
- tcp->reqs_pending++;
- return;
}
}
tcp->flags &= ~(UV_HANDLE_READING | UV_HANDLE_LISTENING);
- closesocket(tcp->socket);
+
+ if (close_socket) {
+ closesocket(tcp->socket);
+ tcp->flags |= UV_HANDLE_TCP_SOCKET_CLOSED;
+ }
+
if (tcp->reqs_pending == 0) {
uv_want_endgame(tcp->loop, (uv_handle_t*)tcp);
}
View
12 src/win/winsock.c
@@ -28,10 +28,6 @@
/* Whether ipv6 is supported */
int uv_allow_ipv6;
-/* Whether there are any LSPs stacked on TCP */
-int uv_tcp_no_lsps_ipv4;
-int uv_tcp_no_lsps_ipv6;
-
/* Whether there are any non-IFS LSPs stacked on TCP */
int uv_tcp_non_ifs_lsp_ipv4;
int uv_tcp_non_ifs_lsp_ipv6;
@@ -121,10 +117,6 @@ void uv_winsock_init() {
uv_tcp_non_ifs_lsp_ipv4 = 1;
}
- if (protocol_info.ProtocolChain.ChainLen == 1) {
- uv_tcp_no_lsps_ipv4 = 1;
- }
-
if (closesocket(dummy) == SOCKET_ERROR) {
uv_fatal_error(WSAGetLastError(), "closesocket");
}
@@ -147,10 +139,6 @@ void uv_winsock_init() {
uv_tcp_non_ifs_lsp_ipv6 = 1;
}
- if (protocol_info.ProtocolChain.ChainLen == 1) {
- uv_tcp_no_lsps_ipv6 = 1;
- }
-
if (closesocket(dummy) == SOCKET_ERROR) {
uv_fatal_error(WSAGetLastError(), "closesocket");
}
View
329 test/run-tests.c
@@ -32,6 +32,11 @@
/* The time in milliseconds after which a single test times out. */
#define TEST_TIMEOUT 5000
+int ipc_helper(int listen_after_write);
+int ipc_helper_tcp_connection(void);
+int ipc_send_recv_helper(void);
+int stdio_over_pipes_helper(void);
+
static int maybe_run_test(int argc, char **argv);
@@ -51,329 +56,6 @@ int main(int argc, char **argv) {
}
-static uv_pipe_t channel;
-static uv_tcp_t tcp_server;
-static uv_write_t conn_notify_req;
-static int close_cb_called;
-static int connection_accepted;
-
-static uv_pipe_t stdin_pipe;
-static uv_pipe_t stdout_pipe;
-static int on_pipe_read_called;
-static int after_write_called;
-static int tcp_conn_read_cb_called;
-static int tcp_conn_write_cb_called;
-
-struct {
- uv_connect_t conn_req;
- uv_write_t tcp_write_req;
- uv_tcp_t conn;
-} tcp_conn;
-
-static void close_cb(uv_handle_t* handle) {
- close_cb_called++;
-}
-
-
-void conn_notify_write_cb(uv_write_t* req, int status) {
- uv_close((uv_handle_t*)&tcp_server, close_cb);
- uv_close((uv_handle_t*)&channel, close_cb);
-}
-
-
-void tcp_connection_write_cb(uv_write_t* req, int status) {
- ASSERT((uv_handle_t*)&tcp_conn.conn == (uv_handle_t*)req->handle);
- uv_close((uv_handle_t*)req->handle, close_cb);
- uv_close((uv_handle_t*)&channel, close_cb);
- uv_close((uv_handle_t*)&tcp_server, close_cb);
- tcp_conn_write_cb_called++;
-}
-
-
-void on_tcp_read(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
- uv_buf_t outbuf;
- int r;
-
- if (nread < 0) {
- if (uv_last_error(tcp->loop).code == UV_EOF) {
- free(buf.base);
- return;
- }
-
- printf("error recving on tcp connection: %s\n",
- uv_strerror(uv_last_error(tcp->loop)));
- abort();
- }
-
- ASSERT(nread > 0);
- ASSERT(memcmp("world\n", buf.base, nread) == 0);
- on_pipe_read_called++;
- free(buf.base);
-
- /* Write to the socket */
- outbuf = uv_buf_init("hello again\n", 12);
- r = uv_write(&tcp_conn.tcp_write_req, tcp, &outbuf, 1, tcp_connection_write_cb);
- ASSERT(r == 0);
-
- tcp_conn_read_cb_called++;
-}
-
-
-static uv_buf_t on_read_alloc(uv_handle_t* handle,
- size_t suggested_size) {
- uv_buf_t buf;
- buf.base = (char*)malloc(suggested_size);
- buf.len = suggested_size;
- return buf;
-}
-
-
-static void connect_cb(uv_connect_t* req, int status) {
- int r;
-
- ASSERT(status == 0);
- r = uv_read_start(req->handle, on_read_alloc, on_tcp_read);
- ASSERT(r == 0);
-}
-
-
-static void ipc_on_connection(uv_stream_t* server, int status) {
- int r;
- uv_buf_t buf;
-
- if (!connection_accepted) {
- /*
- * Accept the connection and close it. Also let the other
- * side know.
- */
- ASSERT(status == 0);
- ASSERT((uv_stream_t*)&tcp_server == server);
-
- r = uv_tcp_init(server->loop, &tcp_conn.conn);
- ASSERT(r == 0);
-
- r = uv_accept(server, (uv_stream_t*)&tcp_conn.conn);
- ASSERT(r == 0);
-
- uv_close((uv_handle_t*)&tcp_conn.conn, close_cb);
-
- buf = uv_buf_init("accepted_connection\n", 20);
- r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
- NULL, conn_notify_write_cb);
- ASSERT(r == 0);
-
- connection_accepted = 1;
- }
-}
-
-
-static void ipc_on_connection_tcp_conn(uv_stream_t* server, int status) {
- int r;
- uv_buf_t buf;
- uv_tcp_t* conn;
-
- ASSERT(status == 0);
- ASSERT((uv_stream_t*)&tcp_server == server);
-
- conn = malloc(sizeof(*conn));
- ASSERT(conn);
-
- r = uv_tcp_init(server->loop, conn);
- ASSERT(r == 0);
-
- r = uv_accept(server, (uv_stream_t*)conn);
- ASSERT(r == 0);
-
- /* Send the accepted connection to the other process */
- buf = uv_buf_init("hello\n", 6);
- r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
- (uv_stream_t*)conn, NULL);
- ASSERT(r == 0);
-
- r = uv_read_start((uv_stream_t*)conn, on_read_alloc, on_tcp_read);
- ASSERT(r == 0);
-
- uv_close((uv_handle_t*)conn, close_cb);
-}
-
-
-static int ipc_helper(int listen_after_write) {
- /*
- * This is launched from test-ipc.c. stdin is a duplex channel that we
- * over which a handle will be transmitted.
- */
-
- uv_write_t write_req;
- int r;
- uv_buf_t buf;
-
- r = uv_pipe_init(uv_default_loop(), &channel, 1);
- ASSERT(r == 0);
-
- uv_pipe_open(&channel, 0);
-
- ASSERT(uv_is_readable((uv_stream_t*) &channel));
- ASSERT(uv_is_writable((uv_stream_t*) &channel));
-
- r = uv_tcp_init(uv_default_loop(), &tcp_server);
- ASSERT(r == 0);
-
- r = uv_tcp_bind(&tcp_server, uv_ip4_addr("0.0.0.0", TEST_PORT));
- ASSERT(r == 0);
-
- if (!listen_after_write) {
- r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection);
- ASSERT(r == 0);
- }
-
- buf = uv_buf_init("hello\n", 6);
- r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1,
- (uv_stream_t*)&tcp_server, NULL);
- ASSERT(r == 0);
-
- if (listen_after_write) {
- r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection);
- ASSERT(r == 0);
- }
-
- r = uv_run(uv_default_loop());
- ASSERT(r == 0);
-
- ASSERT(connection_accepted == 1);
- ASSERT(close_cb_called == 3);
-
- return 0;
-}
-
-
-static int ipc_helper_tcp_connection() {
- /*
- * This is launched from test-ipc.c. stdin is a duplex channel that we
- * over which a handle will be transmitted.
- */
-
- int r;
- struct sockaddr_in addr;
-
- r = uv_pipe_init(uv_default_loop(), &channel, 1);
- ASSERT(r == 0);
-
- uv_pipe_open(&channel, 0);
-
- ASSERT(uv_is_readable((uv_stream_t*)&channel));
- ASSERT(uv_is_writable((uv_stream_t*)&channel));
-
- r = uv_tcp_init(uv_default_loop(), &tcp_server);
- ASSERT(r == 0);
-
- r = uv_tcp_bind(&tcp_server, uv_ip4_addr("0.0.0.0", TEST_PORT));
- ASSERT(r == 0);
-
- r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection_tcp_conn);
- ASSERT(r == 0);
-
- /* Make a connection to the server */
- r = uv_tcp_init(uv_default_loop(), &tcp_conn.conn);
- ASSERT(r == 0);
-
- addr = uv_ip4_addr("127.0.0.1", TEST_PORT);
- r = uv_tcp_connect(&tcp_conn.conn_req, (uv_tcp_t*)&tcp_conn.conn, addr, connect_cb);
- ASSERT(r == 0);
-
- r = uv_run(uv_default_loop());
- ASSERT(r == 0);
-
- ASSERT(tcp_conn_read_cb_called == 1);
- ASSERT(tcp_conn_write_cb_called == 1);
- ASSERT(close_cb_called == 4);
-
- return 0;
-}
-
-
-void on_pipe_read(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
- ASSERT(nread > 0);
- ASSERT(memcmp("hello world\n", buf.base, nread) == 0);
- on_pipe_read_called++;
-
- free(buf.base);
-
- uv_close((uv_handle_t*)&stdin_pipe, close_cb);
- uv_close((uv_handle_t*)&stdout_pipe, close_cb);
-}
-
-
-static void after_pipe_write(uv_write_t* req, int status) {
- ASSERT(status == 0);
- after_write_called++;
-}
-
-
-static int stdio_over_pipes_helper() {
- /* Write several buffers to test that the write order is preserved. */
- char* buffers[] = {
- "he",
- "ll",
- "o ",
- "wo",
- "rl",
- "d",
- "\n"
- };
-
- uv_write_t write_req[ARRAY_SIZE(buffers)];
- uv_buf_t buf[ARRAY_SIZE(buffers)];
- int r, i;
- uv_loop_t* loop = uv_default_loop();
-
- ASSERT(UV_NAMED_PIPE == uv_guess_handle(0));
- ASSERT(UV_NAMED_PIPE == uv_guess_handle(1));
-
- r = uv_pipe_init(loop, &stdin_pipe, 0);
- ASSERT(r == 0);
- r = uv_pipe_init(loop, &stdout_pipe, 0);
- ASSERT(r == 0);
-
- uv_pipe_open(&stdin_pipe, 0);
- uv_pipe_open(&stdout_pipe, 1);
-
- /* Unref both stdio handles to make sure that all writes complete. */
- uv_unref(loop);
- uv_unref(loop);
-
- for (i = 0; i < ARRAY_SIZE(buffers); i++) {
- buf[i] = uv_buf_init((char*)buffers[i], strlen(buffers[i]));
- }
-
- for (i = 0; i < ARRAY_SIZE(buffers); i++) {
- r = uv_write(&write_req[i], (uv_stream_t*)&stdout_pipe, &buf[i], 1,
- after_pipe_write);
- ASSERT(r == 0);
- }
-
- uv_run(loop);
-
- ASSERT(after_write_called == 7);
- ASSERT(on_pipe_read_called == 0);
- ASSERT(close_cb_called == 0);
-
- uv_ref(loop);
- uv_ref(loop);
-
- r = uv_read_start((uv_stream_t*)&stdin_pipe, on_read_alloc,
- on_pipe_read);
- ASSERT(r == 0);
-
- uv_run(loop);
-
- ASSERT(after_write_called == 7);
- ASSERT(on_pipe_read_called == 1);
- ASSERT(close_cb_called == 2);
-
- return 0;
-}
-
-
static int maybe_run_test(int argc, char **argv) {
if (strcmp(argv[1], "--list") == 0) {
print_tests(stdout);
@@ -389,7 +71,6 @@ static int maybe_run_test(int argc, char **argv) {
}
if (strcmp(argv[1], "ipc_send_recv_helper") == 0) {
- int ipc_send_recv_helper(void); /* See test-ipc-send-recv.c */
return ipc_send_recv_helper();
}
View
231 test/test-ipc.c
@@ -33,14 +33,22 @@ static int exit_cb_called;
static int read2_cb_called;
static int tcp_write_cb_called;
static int tcp_read_cb_called;
+static int on_pipe_read_called;
static int local_conn_accepted;
static int remote_conn_accepted;
static int tcp_server_listening;
-
static uv_write_t write_req;
+static uv_pipe_t channel;
+static uv_tcp_t tcp_server;
+static uv_write_t conn_notify_req;
+static int close_cb_called;
+static int connection_accepted;
+static int tcp_conn_read_cb_called;
+static int tcp_conn_write_cb_called;
typedef struct {
uv_connect_t conn_req;
+ uv_write_t tcp_write_req;
uv_tcp_t conn;
} tcp_conn;
@@ -52,7 +60,7 @@ static void close_server_conn_cb(uv_handle_t* handle) {
}
-static void ipc_on_connection(uv_stream_t* server, int status) {
+static void on_connection(uv_stream_t* server, int status) {
uv_tcp_t* conn;
int r;
@@ -159,7 +167,7 @@ static void on_read(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf,
r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server);
ASSERT(r == 0);
- r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection);
+ r = uv_listen((uv_stream_t*)&tcp_server, 12, on_connection);
ASSERT(r == 0);
tcp_server_listening = 1;
@@ -385,3 +393,220 @@ TEST_IMPL(listen_no_simultaneous_accepts) {
return 0;
}
#endif
+
+
+/* Everything here runs in a child process. */
+
+tcp_conn conn;
+
+
+static void close_cb(uv_handle_t* handle) {
+ close_cb_called++;
+}
+
+
+static void conn_notify_write_cb(uv_write_t* req, int status) {
+ uv_close((uv_handle_t*)&tcp_server, close_cb);
+ uv_close((uv_handle_t*)&channel, close_cb);
+}
+
+
+static void tcp_connection_write_cb(uv_write_t* req, int status) {
+ ASSERT((uv_handle_t*)&conn.conn == (uv_handle_t*)req->handle);
+ uv_close((uv_handle_t*)req->handle, close_cb);
+ uv_close((uv_handle_t*)&channel, close_cb);
+ uv_close((uv_handle_t*)&tcp_server, close_cb);
+ tcp_conn_write_cb_called++;
+}
+
+
+static void on_tcp_child_process_read(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
+ uv_buf_t outbuf;
+ int r;
+
+ if (nread < 0) {
+ if (uv_last_error(tcp->loop).code == UV_EOF) {
+ free(buf.base);
+ return;
+ }
+
+ printf("error recving on tcp connection: %s\n",
+ uv_strerror(uv_last_error(tcp->loop)));
+ abort();
+ }
+
+ ASSERT(nread > 0);
+ ASSERT(memcmp("world\n", buf.base, nread) == 0);
+ on_pipe_read_called++;
+ free(buf.base);
+
+ /* Write to the socket */
+ outbuf = uv_buf_init("hello again\n", 12);
+ r = uv_write(&conn.tcp_write_req, tcp, &outbuf, 1, tcp_connection_write_cb);
+ ASSERT(r == 0);
+
+ tcp_conn_read_cb_called++;
+}
+
+
+static void connect_child_process_cb(uv_connect_t* req, int status) {
+ int r;
+
+ ASSERT(status == 0);
+ r = uv_read_start(req->handle, on_read_alloc, on_tcp_child_process_read);
+ ASSERT(r == 0);
+}
+
+
+static void ipc_on_connection(uv_stream_t* server, int status) {
+ int r;
+ uv_buf_t buf;
+
+ if (!connection_accepted) {
+ /*
+ * Accept the connection and close it. Also let the other
+ * side know.
+ */
+ ASSERT(status == 0);
+ ASSERT((uv_stream_t*)&tcp_server == server);
+
+ r = uv_tcp_init(server->loop, &conn.conn);
+ ASSERT(r == 0);
+
+ r = uv_accept(server, (uv_stream_t*)&conn.conn);
+ ASSERT(r == 0);
+
+ uv_close((uv_handle_t*)&conn.conn, close_cb);
+
+ buf = uv_buf_init("accepted_connection\n", 20);
+ r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
+ NULL, conn_notify_write_cb);
+ ASSERT(r == 0);
+
+ connection_accepted = 1;
+ }
+}
+
+
+static void ipc_on_connection_tcp_conn(uv_stream_t* server, int status) {
+ int r;
+ uv_buf_t buf;
+ uv_tcp_t* conn;
+
+ ASSERT(status == 0);
+ ASSERT((uv_stream_t*)&tcp_server == server);
+
+ conn = malloc(sizeof(*conn));
+ ASSERT(conn);
+
+ r = uv_tcp_init(server->loop, conn);
+ ASSERT(r == 0);
+
+ r = uv_accept(server, (uv_stream_t*)conn);
+ ASSERT(r == 0);
+
+ /* Send the accepted connection to the other process */
+ buf = uv_buf_init("hello\n", 6);
+ r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
+ (uv_stream_t*)conn, NULL);
+ ASSERT(r == 0);
+
+ r = uv_read_start((uv_stream_t*)conn, on_read_alloc, on_tcp_child_process_read);
+ ASSERT(r == 0);
+
+ uv_close((uv_handle_t*)conn, close_cb);
+}
+
+
+int ipc_helper(int listen_after_write) {
+ /*
+ * This is launched from test-ipc.c. stdin is a duplex channel that we
+ * over which a handle will be transmitted.
+ */
+
+ uv_write_t write_req;
+ int r;
+ uv_buf_t buf;
+
+ r = uv_pipe_init(uv_default_loop(), &channel, 1);
+ ASSERT(r == 0);
+
+ uv_pipe_open(&channel, 0);
+
+ ASSERT(uv_is_readable((uv_stream_t*) &channel));
+ ASSERT(uv_is_writable((uv_stream_t*) &channel));
+
+ r = uv_tcp_init(uv_default_loop(), &tcp_server);
+ ASSERT(r == 0);
+
+ r = uv_tcp_bind(&tcp_server, uv_ip4_addr("0.0.0.0", TEST_PORT));
+ ASSERT(r == 0);
+
+ if (!listen_after_write) {
+ r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection);
+ ASSERT(r == 0);
+ }
+
+ buf = uv_buf_init("hello\n", 6);
+ r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1,
+ (uv_stream_t*)&tcp_server, NULL);
+ ASSERT(r == 0);
+
+ if (listen_after_write) {
+ r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection);
+ ASSERT(r == 0);
+ }
+
+ r = uv_run(uv_default_loop());
+ ASSERT(r == 0);
+
+ ASSERT(connection_accepted == 1);
+ ASSERT(close_cb_called == 3);
+
+ return 0;
+}
+
+
+int ipc_helper_tcp_connection() {
+ /*
+ * This is launched from test-ipc.c. stdin is a duplex channel that we
+ * over which a handle will be transmitted.
+ */
+
+ int r;
+ struct sockaddr_in addr;
+
+ r = uv_pipe_init(uv_default_loop(), &channel, 1);
+ ASSERT(r == 0);
+
+ uv_pipe_open(&channel, 0);
+
+ ASSERT(uv_is_readable((uv_stream_t*)&channel));
+ ASSERT(uv_is_writable((uv_stream_t*)&channel));
+
+ r = uv_tcp_init(uv_default_loop(), &tcp_server);
+ ASSERT(r == 0);
+
+ r = uv_tcp_bind(&tcp_server, uv_ip4_addr("0.0.0.0", TEST_PORT));
+ ASSERT(r == 0);
+
+ r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection_tcp_conn);
+ ASSERT(r == 0);
+
+ /* Make a connection to the server */
+ r = uv_tcp_init(uv_default_loop(), &conn.conn);
+ ASSERT(r == 0);
+
+ addr = uv_ip4_addr("127.0.0.1", TEST_PORT);
+ r = uv_tcp_connect(&conn.conn_req, (uv_tcp_t*)&conn.conn, addr, connect_child_process_cb);
+ ASSERT(r == 0);
+
+ r = uv_run(uv_default_loop());
+ ASSERT(r == 0);
+
+ ASSERT(tcp_conn_read_cb_called == 1);
+ ASSERT(tcp_conn_write_cb_called == 1);
+ ASSERT(close_cb_called == 4);
+
+ return 0;
+}
View
99 test/test-stdio-over-pipes.c
@@ -40,6 +40,7 @@ static uv_loop_t* loop;
static char output[OUTPUT_SIZE];
static int output_used;
+
typedef struct {
uv_write_t req;
uv_buf_t buf;
@@ -155,3 +156,101 @@ TEST_IMPL(stdio_over_pipes) {
return 0;
}
+
+/* Everything here runs in a child process. */
+
+static int on_pipe_read_called;
+static int after_write_called;
+static uv_pipe_t stdin_pipe;
+static uv_pipe_t stdout_pipe;
+
+static void on_pipe_read(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
+ ASSERT(nread > 0);
+ ASSERT(memcmp("hello world\n", buf.base, nread) == 0);
+ on_pipe_read_called++;
+
+ free(buf.base);
+
+ uv_close((uv_handle_t*)&stdin_pipe, close_cb);
+ uv_close((uv_handle_t*)&stdout_pipe, close_cb);
+}
+
+
+static void after_pipe_write(uv_write_t* req, int status) {
+ ASSERT(status == 0);
+ after_write_called++;
+}
+
+
+static uv_buf_t on_read_alloc(uv_handle_t* handle,
+ size_t suggested_size) {
+ uv_buf_t buf;
+ buf.base = (char*)malloc(suggested_size);
+ buf.len = suggested_size;
+ return buf;
+}
+
+
+int stdio_over_pipes_helper() {
+ /* Write several buffers to test that the write order is preserved. */
+ char* buffers[] = {
+ "he",
+ "ll",
+ "o ",
+ "wo",
+ "rl",
+ "d",
+ "\n"
+ };
+
+ uv_write_t write_req[ARRAY_SIZE(buffers)];
+ uv_buf_t buf[ARRAY_SIZE(buffers)];
+ int r, i;
+ uv_loop_t* loop = uv_default_loop();
+
+ ASSERT(UV_NAMED_PIPE == uv_guess_handle(0));
+ ASSERT(UV_NAMED_PIPE == uv_guess_handle(1));
+
+ r = uv_pipe_init(loop, &stdin_pipe, 0);
+ ASSERT(r == 0);
+ r = uv_pipe_init(loop, &stdout_pipe, 0);
+ ASSERT(r == 0);
+
+ uv_pipe_open(&stdin_pipe, 0);
+ uv_pipe_open(&stdout_pipe, 1);
+
+ /* Unref both stdio handles to make sure that all writes complete. */
+ uv_unref(loop);
+ uv_unref(loop);
+
+ for (i = 0; i < ARRAY_SIZE(buffers); i++) {
+ buf[i] = uv_buf_init((char*)buffers[i], strlen(buffers[i]));
+ }
+
+ for (i = 0; i < ARRAY_SIZE(buffers); i++) {
+ r = uv_write(&write_req[i], (uv_stream_t*)&stdout_pipe, &buf[i], 1,
+ after_pipe_write);
+ ASSERT(r == 0);
+ }
+
+ uv_run(loop);
+
+ ASSERT(after_write_called == 7);
+ ASSERT(on_pipe_read_called == 0);
+ ASSERT(close_cb_called == 0);
+
+ uv_ref(loop);
+ uv_ref(loop);
+
+ r = uv_read_start((uv_stream_t*)&stdin_pipe, on_read_alloc,
+ on_pipe_read);
+ ASSERT(r == 0);
+
+ uv_run(loop);
+
+ ASSERT(after_write_called == 7);
+ ASSERT(on_pipe_read_called == 1);
+ ASSERT(close_cb_called == 2);
+
+ return 0;
+}

0 comments on commit 41ddc95

Please sign in to comment.
Something went wrong with that request. Please try again.