Permalink
Browse files

support half-duplex pipes

  • Loading branch information...
1 parent 53eb993 commit f9be43a564c55a6e28cc2de730cb8d7131d8e558 Igor Zinkovsky committed Feb 8, 2012
Showing with 95 additions and 24 deletions.
  1. +7 −0 include/uv.h
  2. +8 −0 src/unix/stream.c
  3. +64 −24 src/win/pipe.c
  4. +10 −0 src/win/stream.c
  5. +3 −0 test/run-tests.c
  6. +3 −0 test/test-ping-pong.c
View
@@ -470,6 +470,13 @@ struct uv_write_s {
};
+/*
+ * Used to determine whether a stream is readable or writable.
+ * TODO: export in v0.8.
+ */
+/* UV_EXTERN */ int uv_is_readable(uv_stream_t* handle);
+/* UV_EXTERN */ int uv_is_writable(uv_stream_t* handle);
+
/*
* uv_tcp_t is a subclass of uv_stream_t
View
@@ -968,3 +968,11 @@ int uv_read_stop(uv_stream_t* stream) {
}
+int uv_is_readable(uv_stream_t* stream) {
+ return stream->flags & UV_READABLE;
+}
+
+
+int uv_is_writable(uv_stream_t* stream) {
+ return stream->flags & UV_WRITABLE;
+}
View
@@ -98,6 +98,62 @@ static void uv_pipe_connection_init(uv_pipe_t* handle) {
}
+static int open_named_pipe(uv_pipe_t* handle) {
+ /*
+ * Assume that we have a duplex pipe first, so attempt to
+ * connect with GENERIC_READ | GENERIC_WRITE.
+ */
+ handle->handle = CreateFileW(handle->name,
+ GENERIC_READ | GENERIC_WRITE,
+ 0,
+ NULL,
+ OPEN_EXISTING,
+ FILE_FLAG_OVERLAPPED,
+ NULL);
+
+ if (handle->handle != INVALID_HANDLE_VALUE) {
+ return 0;
+ }
+
+ /*
+ * If the pipe is not duplex CreateFileW fails with
+ * ERROR_ACCESS_DENIED. In that case try to connect
+ * as a read-only or write-only.
+ */
+ if (GetLastError() == ERROR_ACCESS_DENIED) {
+ handle->handle = CreateFileW(handle->name,
+ GENERIC_READ | FILE_WRITE_ATTRIBUTES,
+ 0,
+ NULL,
+ OPEN_EXISTING,
+ FILE_FLAG_OVERLAPPED,
+ NULL);
+
+ if (handle->handle != INVALID_HANDLE_VALUE) {
+ handle->flags |= UV_HANDLE_SHUT;
+ return 0;
+ }
+ }
+
+ if (GetLastError() == ERROR_ACCESS_DENIED) {
+ handle->handle = CreateFileW(handle->name,
+ GENERIC_WRITE | FILE_READ_ATTRIBUTES,
+ 0,
+ NULL,
+ OPEN_EXISTING,
+ FILE_FLAG_OVERLAPPED,
+ NULL);
+
+ if (handle->handle != INVALID_HANDLE_VALUE) {
+ handle->flags |= UV_HANDLE_EOF;
+ return 0;
+ }
+ }
+
+ return -1;
+}
+
+
int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access,
char* name, size_t nameSize) {
HANDLE pipeHandle;
@@ -437,15 +493,7 @@ static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
/* We wait for the pipe to become available with WaitNamedPipe. */
while (WaitNamedPipeW(handle->name, 30000)) {
/* The pipe is now available, try to connect. */
- pipeHandle = CreateFileW(handle->name,
- GENERIC_READ | GENERIC_WRITE,
- 0,
- NULL,
- OPEN_EXISTING,
- FILE_FLAG_OVERLAPPED,
- NULL);
-
- if (pipeHandle != INVALID_HANDLE_VALUE) {
+ if (open_named_pipe(handle) == 0) {
break;
}
@@ -471,7 +519,6 @@ void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
const char* name, uv_connect_cb cb) {
uv_loop_t* loop = handle->loop;
int errno, nameSize;
- HANDLE pipeHandle;
handle->handle = INVALID_HANDLE_VALUE;
@@ -492,15 +539,7 @@ void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
goto error;
}
- pipeHandle = CreateFileW(handle->name,
- GENERIC_READ | GENERIC_WRITE,
- 0,
- NULL,
- OPEN_EXISTING,
- FILE_FLAG_OVERLAPPED,
- NULL);
-
- if (pipeHandle == INVALID_HANDLE_VALUE) {
+ if (open_named_pipe(handle) != 0) {
if (GetLastError() == ERROR_PIPE_BUSY) {
/* Wait for the server to make a pipe instance available. */
if (!QueueUserWorkItem(&pipe_connect_thread_proc,
@@ -519,13 +558,13 @@ void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
goto error;
}
- if (uv_set_pipe_handle(loop, (uv_pipe_t*)req->handle, pipeHandle)) {
+ assert(handle->handle != INVALID_HANDLE_VALUE);
+
+ if (uv_set_pipe_handle(loop, (uv_pipe_t*)req->handle, handle->handle)) {
errno = GetLastError();
goto error;
}
- handle->handle = pipeHandle;
-
SET_REQ_SUCCESS(req);
uv_insert_pending_req(loop, (uv_req_t*) req);
handle->reqs_pending++;
@@ -537,8 +576,9 @@ void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
handle->name = NULL;
}
- if (pipeHandle != INVALID_HANDLE_VALUE) {
- CloseHandle(pipeHandle);
+ if (handle->handle != INVALID_HANDLE_VALUE) {
+ CloseHandle(handle->handle);
+ handle->handle = INVALID_HANDLE_VALUE;
}
/* Make this req pending reporting an error. */
View
@@ -186,3 +186,13 @@ size_t uv_count_bufs(uv_buf_t bufs[], int count) {
return bytes;
}
+
+
+int uv_is_readable(uv_stream_t* handle) {
+ return !(handle->flags & UV_HANDLE_EOF);
+}
+
+
+int uv_is_writable(uv_stream_t* handle) {
+ return !(handle->flags & UV_HANDLE_SHUT);
+}
View
@@ -129,6 +129,9 @@ static int ipc_helper(int listen_after_write) {
uv_pipe_open(&channel, 0);
+ ASSERT(uv_is_readable(&channel));
+ ASSERT(uv_is_writable(&channel));
+
r = uv_tcp_init(uv_default_loop(), &tcp_server);
ASSERT(r == 0);
View
@@ -138,6 +138,9 @@ static void pinger_on_connect(uv_connect_t *req, int status) {
ASSERT(status == 0);
+ ASSERT(uv_is_readable(req->handle));
+ ASSERT(uv_is_writable(req->handle));
+
pinger_write_ping(pinger);
uv_read_start((uv_stream_t*)(req->handle), alloc_cb, pinger_read_cb);

0 comments on commit f9be43a

Please sign in to comment.