diff --git a/base/loading.jl b/base/loading.jl index fd0199430b533..f2bf7c2f71d77 100644 --- a/base/loading.jl +++ b/base/loading.jl @@ -296,7 +296,7 @@ function create_expr_cache(input::AbstractString, output::AbstractString) eval(Main, deserialize(STDIN)) end """ - io = open(detach(`$(julia_cmd()) + io, pobj = open(detach(`$(julia_cmd()) --output-ji $output --output-incremental=yes --startup-file=no --history-file=no --eval $code_object`), "w", STDOUT) @@ -321,9 +321,9 @@ function create_expr_cache(input::AbstractString, output::AbstractString) delete!(task_local_storage(), :SOURCE_PATH) end) end - close(io.in) - wait(io) - return io + close(io) + wait(pobj) + return pobj end compilecache(mod::Symbol) = compilecache(string(mod)) diff --git a/base/managers.jl b/base/managers.jl index bddca3f68cf85..5102eecb70097 100644 --- a/base/managers.jl +++ b/base/managers.jl @@ -106,10 +106,10 @@ function launch_on_machine(manager::SSHManager, machine, cnt, params, launched, cmd = `ssh -T -a -x -o ClearAllForwardings=yes -n $sshflags $host $(shell_escape(cmd))` # use ssh to remote launch # launch - pobj = open(detach(cmd), "r") + io, pobj = open(detach(cmd), "r") wconfig = WorkerConfig() - wconfig.io = pobj.out + wconfig.io = io wconfig.host = host wconfig.tunnel = params[:tunnel] wconfig.sshflags = sshflags @@ -188,11 +188,11 @@ function launch(manager::LocalManager, params::Dict, launched::Array, c::Conditi exeflags = params[:exeflags] for i in 1:manager.np - pobj = open(detach( + io, pobj = open(detach( setenv(`$(julia_cmd(exename)) $exeflags --bind-to $(LPROC.bind_addr) --worker`, dir=dir)), "r") wconfig = WorkerConfig() wconfig.process = pobj - wconfig.io = pobj.out + wconfig.io = io push!(launched, wconfig) end diff --git a/base/multi.jl b/base/multi.jl index 530029d2d3ae3..58004f45d21ef 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -1032,6 +1032,7 @@ end # The master process uses this to connect to the worker and subsequently # setup a all-to-all network. function read_worker_host_port(io::IO) + io.line_buffered = true while true conninfo = readline(io) bind_addr, port = parse_connection_info(conninfo) @@ -1269,8 +1270,8 @@ function launch_additional(np::Integer, cmd::Cmd) addresses = cell(np) for i in 1:np - pobj = open(detach(cmd), "r") - io_objs[i] = pobj.out + io, pobj = open(detach(cmd), "r") + io_objs[i] = io end for (i,io) in enumerate(io_objs) diff --git a/base/process.jl b/base/process.jl index 56f41390d7655..879e4288bf356 100644 --- a/base/process.jl +++ b/base/process.jl @@ -465,38 +465,31 @@ eachline(cmd::AbstractCmd) = eachline(cmd, DevNull) # return a Process object to read-to/write-from the pipeline function open(cmds::AbstractCmd, mode::AbstractString="r", other::AsyncStream=DevNull) - if mode == "r+" || mode == "w+" - other === DevNull || throw(ArgumentError("no other stream for mode rw+")) - in = Pipe() - out = Pipe() - processes = spawn(cmds, (in,out,STDERR)) - close(in.out) - close(out.in) - elseif mode == "r" + if mode == "r" in = other - out = Pipe() + out = io = Pipe() processes = spawn(cmds, (in,out,STDERR)) close(out.in) elseif mode == "w" - in = Pipe() + in = io = Pipe() out = other processes = spawn(cmds, (in,out,STDERR)) close(in.out) else throw(ArgumentError("mode must be \"r\" or \"w\", not \"$mode\"")) end - return processes + return (io, processes) end function open(f::Function, cmds::AbstractCmd, args...) - P = open(cmds, args...) + io, P = open(cmds, args...) ret = try - f(P) + f(io) catch kill(P) rethrow() finally - close(P) + close(io) end success(P) || pipeline_error(P) return ret @@ -504,13 +497,14 @@ end # TODO: deprecate this function readandwrite(cmds::AbstractCmd) - processes = open(cmds, "r+") - (processes.out, processes.in, processes) + in = Pipe() + out, processes = open(cmds, "r", in) + (out, in, processes) end function readbytes(cmd::AbstractCmd, stdin::AsyncStream=DevNull) - procs = open(cmd, "r", stdin) - bytes = readbytes(procs.out) + out, procs = open(cmd, "r", stdin) + bytes = readbytes(out) !success(procs) && pipeline_error(procs) return bytes end diff --git a/base/stream.jl b/base/stream.jl index 18b2085369560..a72526978b9a0 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -546,7 +546,6 @@ iswritable(io::AbstractPipe) = iswritable(io.in) read{T<:AbstractPipe}(io::T, args...) = read(io.out, args...) write{T<:AbstractPipe}(io::T, args...) = write(io.in, args...) write{S<:AbstractPipe,T}(io::S, a::Array{T}) = write(io.in, a) -buffer_or_write(io::AbstractPipe, p::Ptr, n::Integer) = buffer_or_write(io.in, p, n) readuntil{T<:AbstractPipe}(io::T, args...) = readuntil(io.out, args...) read!{T<:AbstractPipe}(io::T, args...) = read!(io.out, args...) readbytes(io::AbstractPipe) = readbytes(io.out) diff --git a/examples/clustermanager/simple/UnixDomainCM.jl b/examples/clustermanager/simple/UnixDomainCM.jl index f5d5339c9dfb0..2c356cee3847d 100644 --- a/examples/clustermanager/simple/UnixDomainCM.jl +++ b/examples/clustermanager/simple/UnixDomainCM.jl @@ -12,10 +12,10 @@ function launch(manager::UnixDomainCM, params::Dict, launched::Array, c::Conditi sockname = tempname() try cmd = `$(params[:exename]) $(@__FILE__) udwrkr $sockname` - pobj = open(cmd, "r") + io, pobj = open(cmd, "r") wconfig = WorkerConfig() - wconfig.userdata = Dict(:sockname=>sockname, :io=>pobj.out, :process=>pobj) + wconfig.userdata = Dict(:sockname=>sockname, :io=>io, :process=>pobj) push!(launched, wconfig) notify(c) catch e diff --git a/test/examples.jl b/test/examples.jl index 284ad096ca8a4..fb64ce6b418ad 100644 --- a/test/examples.jl +++ b/test/examples.jl @@ -40,11 +40,10 @@ include(joinpath(dir, "queens.jl")) script = joinpath(dir, "clustermanager/simple/test_simple.jl") cmd = `$(joinpath(JULIA_HOME,Base.julia_exename())) $script` - proc = open(cmd) - errors = readall(proc) + (strm, proc) = open(cmd) wait(proc) if !success(proc) && ccall(:jl_running_on_valgrind,Cint,()) == 0 - println(errors) + println(readall(strm)) error("UnixDomainCM failed test, cmd : $cmd") end end diff --git a/test/spawn.jl b/test/spawn.jl index 06cb63016ef68..cc33cb0811010 100644 --- a/test/spawn.jl +++ b/test/spawn.jl @@ -231,10 +231,10 @@ sleep(1) import Base.zzzInvalidIdentifier """ try - in = open(`$exename -f`, "w") + (in,p) = open(`$exename -f`, "w") write(in,cmd) close(in) - wait(in) + wait(p) catch error("IOStream redirect failed. Child stderr was \n$(readall(fname))\n") end