pipeline() cannot read stdout into IOBuffer #14437

Open
c42f opened this Issue Dec 18, 2015 · 7 comments

Comments

Projects
None yet
4 participants
@c42f
Contributor

c42f commented Dec 18, 2015

Reproduce with

julia> run(pipeline(`ls`, stdout=IOBuffer()))
ERROR: MethodError: `uvhandle` has no method matching uvhandle(::Base.AbstractIOBuffer{Array{UInt8,1}})
 in _jl_spawn at process.jl:253
 in anonymous at process.jl:415
 in setup_stdio at process.jl:403
 in spawn at process.jl:414
 in spawn at process.jl:293
 in run at process.jl:530

@tkelman tkelman added the I/O label Dec 18, 2015

@vtjnash vtjnash added the wontfix label Jul 27, 2016

@vtjnash

This comment has been minimized.

Show comment
Hide comment
@vtjnash

vtjnash Jul 27, 2016

Member

we can't readily support arbitrary Julia IO object by converting them to kernel objects. the right object to pass to STDOUT here is Pipe().

Member

vtjnash commented Jul 27, 2016

we can't readily support arbitrary Julia IO object by converting them to kernel objects. the right object to pass to STDOUT here is Pipe().

@tkelman tkelman added the needs docs label Jul 27, 2016

@c42f

This comment has been minimized.

Show comment
Hide comment
@c42f

c42f Jul 27, 2016

Contributor

Right, it sounds like I was doing something fundamentally misguided here.

To give a bit more context about where I went wrong, I was trying to read data from both stdout and stderr, and I wanted to read them into memory rather than going to temporary files. If these are both Pipe objects making sure things don't block becomes a bit more interesting since I can't just call readstring(). After some fiddling I still don't seem to have the right combination of spawn and @async.

out = Pipe()
err = Pipe()
spawn(pipeline(`ls`, stdout=out, stderr=err)) # avoiding run, as that'll potentially block when the pipe gets full
# what now!
Contributor

c42f commented Jul 27, 2016

Right, it sounds like I was doing something fundamentally misguided here.

To give a bit more context about where I went wrong, I was trying to read data from both stdout and stderr, and I wanted to read them into memory rather than going to temporary files. If these are both Pipe objects making sure things don't block becomes a bit more interesting since I can't just call readstring(). After some fiddling I still don't seem to have the right combination of spawn and @async.

out = Pipe()
err = Pipe()
spawn(pipeline(`ls`, stdout=out, stderr=err)) # avoiding run, as that'll potentially block when the pipe gets full
# what now!
@vtjnash

This comment has been minimized.

Show comment
Hide comment
@vtjnash

vtjnash Jul 27, 2016

Member
out = Pipe()
err = Pipe()
p = spawn(pipeline(`ls`, stdout=out, stderr=err))
close(out.in); close(err.in)
err_t = @async readstring(err)
out_s = readstring(out)
err_s = wait(err_t)
Member

vtjnash commented Jul 27, 2016

out = Pipe()
err = Pipe()
p = spawn(pipeline(`ls`, stdout=out, stderr=err))
close(out.in); close(err.in)
err_t = @async readstring(err)
out_s = readstring(out)
err_s = wait(err_t)
@c42f

This comment has been minimized.

Show comment
Hide comment
@c42f

c42f Jul 28, 2016

Contributor

Thanks a lot for the snippet!

A question about the won't fix label - it seems like it'd be possible to support an arbitrary IO by delegating to Pipe() as the underlying object passed to libuv, and reading bytes asynchronously from that pipe into any arbitrary user-defined IO object. Is there a reason why this fundamentally won't work or is a bad idea?

Contributor

c42f commented Jul 28, 2016

Thanks a lot for the snippet!

A question about the won't fix label - it seems like it'd be possible to support an arbitrary IO by delegating to Pipe() as the underlying object passed to libuv, and reading bytes asynchronously from that pipe into any arbitrary user-defined IO object. Is there a reason why this fundamentally won't work or is a bad idea?

@c42f

This comment has been minimized.

Show comment
Hide comment
@c42f

c42f Jul 28, 2016

Contributor

Proof of concept, which does appear to work:

import Base: AbstractCmd, StdIOSet, Callback, ProcessChain, STDOUT_NO, STDERR_NO, STDIN_NO

# Copy stream src to dest in preferred chunks of size `bufsize`.  Perhaps there's a builtin function for this.
function copystream(src::IO, dest::IO, bufsize=10*1024)
    buf = Vector{UInt8}(bufsize)
    while isopen(src)
        nread = readbytes!(src, buf)
        write(dest, buf[1:nread]) # fixme - should avoid the slice, but unsure how to do so.
        #println("wrote $nread bytes")
    end
end

# Special duplicate of Base.CmdRedirect for redirecting to IOBuffer - would
# need generalization.
immutable CmdRedirect2 <: AbstractCmd
    cmd::AbstractCmd
    handle::IOBuffer
    stream_no::Int
    pipe::Pipe
end

CmdRedirect2(cmd, handle, stream_no) = CmdRedirect2(cmd, handle, stream_no, Pipe())


Base.redir_out(src::AbstractCmd, dest::IOBuffer) = CmdRedirect2(src, dest, STDOUT_NO)
Base.redir_err(src::AbstractCmd, dest::IOBuffer) = CmdRedirect2(src, dest, STDERR_NO)


function Base.spawn(redirect::CmdRedirect2, stdios::StdIOSet, exitcb::Callback, closecb::Callback; chain::Nullable{ProcessChain}=Nullable{ProcessChain}())
    p = spawn(redirect.cmd,
          (redirect.stream_no == STDIN_NO  ? redirect.pipe : stdios[1],
           redirect.stream_no == STDOUT_NO ? redirect.pipe : stdios[2],
           redirect.stream_no == STDERR_NO ? redirect.pipe : stdios[3]),
          exitcb, closecb, chain=chain)
    close(redirect.pipe.in)
    # Um.  Is using @async acceptable here?
    @async copystream(redirect.pipe.out, redirect.handle)
    p
end


#-------------------------------------------------------------------------------
# Usage
out = IOBuffer()
err = IOBuffer()
run(pipeline(`ls -R /home/cfoster/tmp`, stdout=out, stderr=err))
Contributor

c42f commented Jul 28, 2016

Proof of concept, which does appear to work:

import Base: AbstractCmd, StdIOSet, Callback, ProcessChain, STDOUT_NO, STDERR_NO, STDIN_NO

# Copy stream src to dest in preferred chunks of size `bufsize`.  Perhaps there's a builtin function for this.
function copystream(src::IO, dest::IO, bufsize=10*1024)
    buf = Vector{UInt8}(bufsize)
    while isopen(src)
        nread = readbytes!(src, buf)
        write(dest, buf[1:nread]) # fixme - should avoid the slice, but unsure how to do so.
        #println("wrote $nread bytes")
    end
end

# Special duplicate of Base.CmdRedirect for redirecting to IOBuffer - would
# need generalization.
immutable CmdRedirect2 <: AbstractCmd
    cmd::AbstractCmd
    handle::IOBuffer
    stream_no::Int
    pipe::Pipe
end

CmdRedirect2(cmd, handle, stream_no) = CmdRedirect2(cmd, handle, stream_no, Pipe())


Base.redir_out(src::AbstractCmd, dest::IOBuffer) = CmdRedirect2(src, dest, STDOUT_NO)
Base.redir_err(src::AbstractCmd, dest::IOBuffer) = CmdRedirect2(src, dest, STDERR_NO)


function Base.spawn(redirect::CmdRedirect2, stdios::StdIOSet, exitcb::Callback, closecb::Callback; chain::Nullable{ProcessChain}=Nullable{ProcessChain}())
    p = spawn(redirect.cmd,
          (redirect.stream_no == STDIN_NO  ? redirect.pipe : stdios[1],
           redirect.stream_no == STDOUT_NO ? redirect.pipe : stdios[2],
           redirect.stream_no == STDERR_NO ? redirect.pipe : stdios[3]),
          exitcb, closecb, chain=chain)
    close(redirect.pipe.in)
    # Um.  Is using @async acceptable here?
    @async copystream(redirect.pipe.out, redirect.handle)
    p
end


#-------------------------------------------------------------------------------
# Usage
out = IOBuffer()
err = IOBuffer()
run(pipeline(`ls -R /home/cfoster/tmp`, stdout=out, stderr=err))
@c42f

This comment has been minimized.

Show comment
Hide comment
@c42f

c42f Jul 28, 2016

Contributor

There's lots of (I think fixable) things wrong with the above PoC of course - eg, using async for the stream copy means that run() will probably exit before the contents of the pipe buffer have been fully copied across.

Contributor

c42f commented Jul 28, 2016

There's lots of (I think fixable) things wrong with the above PoC of course - eg, using async for the stream copy means that run() will probably exit before the contents of the pipe buffer have been fully copied across.

@vtjnash

This comment has been minimized.

Show comment
Hide comment
@vtjnash

vtjnash Jul 28, 2016

Member

That looks about right to me (the @async call looks correct as well). I don't see how you can signal EOF through the downstream IO objects. Maybe all that needs is an API for signaling it, but it also would seem to imply every IO object needs to contain a Condition object for EOF notification.

As a package for the above code, I could see this EOF information simply being expected to be handled via a side-channel. For example, you could subtype AbstractPipe to quickly make a new abstraction. Maybe something like:

import Base: Pipe, pipe_reader, pipe_writer, eof
immutable PipeFitting <: AbstractPipe
    eof::Condition
    out::IO
    in::Pipe
    function PipeFitting(out)
        p = new(Condition(), out, Pipe())
        @async copystream(p.out, p.in)
        return p
    end
end
pipe_reader(p::PipeFitting) = p.out
pipe_writer(p::PipeFitting) = p.in

seteof(p::PipeFitting) = (close(p.in); notify(p.eof))
eof(p::PipeFitting) = nb_available(p.out) ? false : (isopen(p.in) ? wait(p.eof) : true)
Member

vtjnash commented Jul 28, 2016

That looks about right to me (the @async call looks correct as well). I don't see how you can signal EOF through the downstream IO objects. Maybe all that needs is an API for signaling it, but it also would seem to imply every IO object needs to contain a Condition object for EOF notification.

As a package for the above code, I could see this EOF information simply being expected to be handled via a side-channel. For example, you could subtype AbstractPipe to quickly make a new abstraction. Maybe something like:

import Base: Pipe, pipe_reader, pipe_writer, eof
immutable PipeFitting <: AbstractPipe
    eof::Condition
    out::IO
    in::Pipe
    function PipeFitting(out)
        p = new(Condition(), out, Pipe())
        @async copystream(p.out, p.in)
        return p
    end
end
pipe_reader(p::PipeFitting) = p.out
pipe_writer(p::PipeFitting) = p.in

seteof(p::PipeFitting) = (close(p.in); notify(p.eof))
eof(p::PipeFitting) = nb_available(p.out) ? false : (isopen(p.in) ? wait(p.eof) : true)

@JeffBezanson JeffBezanson removed the wontfix label Aug 15, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment