From fd74f16b51e68592032633b553b63894727aa012 Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Mon, 18 Dec 2017 15:08:39 -0500 Subject: [PATCH] fix flush for libuv streams --- base/stream.jl | 15 ++++++----- src/jl_uv.c | 70 +++++++++++++++++++++++--------------------------- test/read.jl | 15 +++++++++++ 3 files changed, 55 insertions(+), 45 deletions(-) diff --git a/base/stream.jl b/base/stream.jl index e95fb4323205b..953c02fe7c1de 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -872,14 +872,15 @@ function unsafe_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt) end function flush(s::LibuvStream) - if s.sendbuf === nothing - return - end - buf = s.sendbuf - if nb_available(buf) > 0 - arr = take!(buf) # Array of UInt8s - uv_write(s, arr) + if s.sendbuf !== nothing + buf = s.sendbuf + if nb_available(buf) > 0 + arr = take!(buf) # Array of UInt8s + uv_write(s, arr) + return + end end + uv_write(s, Ptr{UInt8}(Base.eventloop()), UInt(0)) # zero write from a random pointer to flush current queue return end diff --git a/src/jl_uv.c b/src/jl_uv.c index 4753655bbdd9d..80ffc4d0fb7b7 100644 --- a/src/jl_uv.c +++ b/src/jl_uv.c @@ -98,15 +98,33 @@ static void jl_uv_closeHandle(uv_handle_t *handle) free(handle); } -static void jl_uv_shutdownCallback(uv_shutdown_t *req, int status) -{ - /* - * This happens if the remote machine closes the connecition while we're - * in the shutdown request (in that case we call uv_close, thus cancelling - * this request). - */ - if (status != UV__ECANCELED && !uv_is_closing((uv_handle_t*)req->handle)) { - uv_close((uv_handle_t*)req->handle, &jl_uv_closeHandle); +static void jl_uv_flush_close_callback(uv_write_t *req, int status) +{ + uv_stream_t *stream = req->handle; + req->handle = NULL; + // ignore attempts to close the stream while attempting a graceful shutdown +#ifdef _OS_WINDOWS_ + if (stream->stream.conn.shutdown_req) +#else + if (stream->shutdown_req) +#endif + { + free(req); + return; + } + if (status == 0 && uv_is_writable(stream) && stream->write_queue_size != 0) { + // new data was written, wait for it to flush too + uv_buf_t buf; + buf.base = (char*)(req + 1); + buf.len = 0; + req->data = NULL; + if (uv_write(req, stream, &buf, 1, (uv_write_cb)jl_uv_flush_close_callback) == 0) + return; + } + if (!uv_is_closing((uv_handle_t*)stream)) { // avoid double-close on the stream + if (stream->type == UV_TTY) + uv_tty_set_mode((uv_tty_t*)stream, UV_TTY_MODE_NORMAL); + uv_close((uv_handle_t*)stream, &jl_uv_closeHandle); } free(req); } @@ -196,39 +214,15 @@ JL_DLLEXPORT void jl_close_uv(uv_handle_t *handle) return; } - if (handle->type == UV_NAMED_PIPE || handle->type == UV_TCP) { - uv_stream_t *stream = (uv_stream_t*)handle; -#ifdef _OS_WINDOWS_ - if (stream->stream.conn.shutdown_req) { -#else - if (stream->shutdown_req) { -#endif - // don't close the stream while attempting a graceful shutdown - return; - } - if (uv_is_writable(stream) && stream->write_queue_size != 0) { - // attempt graceful shutdown of writable streams to give them a chance to flush first - // TODO: introduce a uv_drain cb API instead of abusing uv_shutdown in this way - uv_shutdown_t *req = (uv_shutdown_t*)malloc(sizeof(uv_shutdown_t)); - req->data = 0; - /* - * We are explicitly ignoring the error here for the following reason: - * There is only two scenarios in which this returns an error: - * a) In case the stream is already shut down, in which case we're likely - * in the process of closing this stream (since there's no other call to - * uv_shutdown). - * b) In case the stream is already closed, in which case uv_close would - * cause an assertion failure. - */ - uv_shutdown(req, stream, &jl_uv_shutdownCallback); - return; - } + if (handle->type == UV_NAMED_PIPE || handle->type == UV_TCP || handle->type == UV_TTY) { + uv_write_t *req = (uv_write_t*)malloc(sizeof(uv_write_t)); + req->handle = (uv_stream_t*)handle; + jl_uv_flush_close_callback(req, 0); + return; } if (!uv_is_closing(handle)) { // avoid double-closing the stream - if (handle->type == UV_TTY) - uv_tty_set_mode((uv_tty_t*)handle, UV_TTY_MODE_NORMAL); uv_close(handle, &jl_uv_closeHandle); } } diff --git a/test/read.jl b/test/read.jl index c72ac33deb9b4..d590a378687d8 100644 --- a/test/read.jl +++ b/test/read.jl @@ -537,3 +537,18 @@ end # mktempdir() do dir @test countlines(file,'\n') == 4 rm(file) end + +let p = Pipe() + Base.link_pipe(p, julia_only_read=true, julia_only_write=true) + t = @schedule read(p) + @sync begin + @async write(p, zeros(UInt16, 660_000)) + for i = 1:typemax(UInt16) + @async write(p, UInt16(i)) + end + @async close(p.in) + end + s = reinterpret(UInt16, wait(t)) + @test length(s) == 660_000 + typemax(UInt16) + @test s[(end - typemax(UInt16)):end] == UInt16.(0:typemax(UInt16)) +end