Skip to content

Commit

Permalink
fix flush for libuv streams
Browse files Browse the repository at this point in the history
  • Loading branch information
vtjnash committed Dec 26, 2017
1 parent 1a6e34b commit fd74f16
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 45 deletions.
15 changes: 8 additions & 7 deletions base/stream.jl
Expand Up @@ -872,14 +872,15 @@ function unsafe_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
end

function flush(s::LibuvStream)
if s.sendbuf === nothing
return
end
buf = s.sendbuf
if nb_available(buf) > 0
arr = take!(buf) # Array of UInt8s
uv_write(s, arr)
if s.sendbuf !== nothing
buf = s.sendbuf
if nb_available(buf) > 0
arr = take!(buf) # Array of UInt8s
uv_write(s, arr)
return
end
end
uv_write(s, Ptr{UInt8}(Base.eventloop()), UInt(0)) # zero write from a random pointer to flush current queue
return
end

Expand Down
70 changes: 32 additions & 38 deletions src/jl_uv.c
Expand Up @@ -98,15 +98,33 @@ static void jl_uv_closeHandle(uv_handle_t *handle)
free(handle);
}

static void jl_uv_shutdownCallback(uv_shutdown_t *req, int status)
{
/*
* This happens if the remote machine closes the connecition while we're
* in the shutdown request (in that case we call uv_close, thus cancelling
* this request).
*/
if (status != UV__ECANCELED && !uv_is_closing((uv_handle_t*)req->handle)) {
uv_close((uv_handle_t*)req->handle, &jl_uv_closeHandle);
static void jl_uv_flush_close_callback(uv_write_t *req, int status)
{
uv_stream_t *stream = req->handle;
req->handle = NULL;
// ignore attempts to close the stream while attempting a graceful shutdown
#ifdef _OS_WINDOWS_
if (stream->stream.conn.shutdown_req)
#else
if (stream->shutdown_req)
#endif
{
free(req);
return;
}
if (status == 0 && uv_is_writable(stream) && stream->write_queue_size != 0) {
// new data was written, wait for it to flush too
uv_buf_t buf;
buf.base = (char*)(req + 1);
buf.len = 0;
req->data = NULL;
if (uv_write(req, stream, &buf, 1, (uv_write_cb)jl_uv_flush_close_callback) == 0)
return;
}
if (!uv_is_closing((uv_handle_t*)stream)) { // avoid double-close on the stream
if (stream->type == UV_TTY)
uv_tty_set_mode((uv_tty_t*)stream, UV_TTY_MODE_NORMAL);
uv_close((uv_handle_t*)stream, &jl_uv_closeHandle);
}
free(req);
}
Expand Down Expand Up @@ -196,39 +214,15 @@ JL_DLLEXPORT void jl_close_uv(uv_handle_t *handle)
return;
}

if (handle->type == UV_NAMED_PIPE || handle->type == UV_TCP) {
uv_stream_t *stream = (uv_stream_t*)handle;
#ifdef _OS_WINDOWS_
if (stream->stream.conn.shutdown_req) {
#else
if (stream->shutdown_req) {
#endif
// don't close the stream while attempting a graceful shutdown
return;
}
if (uv_is_writable(stream) && stream->write_queue_size != 0) {
// attempt graceful shutdown of writable streams to give them a chance to flush first
// TODO: introduce a uv_drain cb API instead of abusing uv_shutdown in this way
uv_shutdown_t *req = (uv_shutdown_t*)malloc(sizeof(uv_shutdown_t));
req->data = 0;
/*
* We are explicitly ignoring the error here for the following reason:
* There is only two scenarios in which this returns an error:
* a) In case the stream is already shut down, in which case we're likely
* in the process of closing this stream (since there's no other call to
* uv_shutdown).
* b) In case the stream is already closed, in which case uv_close would
* cause an assertion failure.
*/
uv_shutdown(req, stream, &jl_uv_shutdownCallback);
return;
}
if (handle->type == UV_NAMED_PIPE || handle->type == UV_TCP || handle->type == UV_TTY) {
uv_write_t *req = (uv_write_t*)malloc(sizeof(uv_write_t));
req->handle = (uv_stream_t*)handle;
jl_uv_flush_close_callback(req, 0);
return;
}

if (!uv_is_closing(handle)) {
// avoid double-closing the stream
if (handle->type == UV_TTY)
uv_tty_set_mode((uv_tty_t*)handle, UV_TTY_MODE_NORMAL);
uv_close(handle, &jl_uv_closeHandle);
}
}
Expand Down
15 changes: 15 additions & 0 deletions test/read.jl
Expand Up @@ -537,3 +537,18 @@ end # mktempdir() do dir
@test countlines(file,'\n') == 4
rm(file)
end

let p = Pipe()
Base.link_pipe(p, julia_only_read=true, julia_only_write=true)
t = @schedule read(p)
@sync begin
@async write(p, zeros(UInt16, 660_000))
for i = 1:typemax(UInt16)
@async write(p, UInt16(i))
end
@async close(p.in)
end
s = reinterpret(UInt16, wait(t))
@test length(s) == 660_000 + typemax(UInt16)
@test s[(end - typemax(UInt16)):end] == UInt16.(0:typemax(UInt16))
end

0 comments on commit fd74f16

Please sign in to comment.