Skip to content

Commit

Permalink
Revert "Revert open(cmd) => Process change"
Browse files Browse the repository at this point in the history
This reverts commit 8ffdfc2.
  • Loading branch information
vtjnash committed Aug 26, 2015
1 parent aa8cd2e commit 955fab4
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 28 deletions.
8 changes: 4 additions & 4 deletions base/loading.jl
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ function create_expr_cache(input::AbstractString, output::AbstractString)
eval(Main, deserialize(STDIN))
end
"""
io, pobj = open(detach(`$(julia_cmd())
io = open(detach(`$(julia_cmd())
--output-ji $output --output-incremental=yes
--startup-file=no --history-file=no
--eval $code_object`), "w", STDOUT)
Expand All @@ -321,9 +321,9 @@ function create_expr_cache(input::AbstractString, output::AbstractString)
delete!(task_local_storage(), :SOURCE_PATH)
end)
end
close(io)
wait(pobj)
return pobj
close(io.in)
wait(io)
return io
end

compilecache(mod::Symbol) = compilecache(string(mod))
Expand Down
8 changes: 4 additions & 4 deletions base/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ function launch_on_machine(manager::SSHManager, machine, cnt, params, launched,
cmd = `ssh -T -a -x -o ClearAllForwardings=yes -n $sshflags $host $(shell_escape(cmd))` # use ssh to remote launch

# launch
io, pobj = open(detach(cmd), "r")
pobj = open(detach(cmd), "r")
wconfig = WorkerConfig()

wconfig.io = io
wconfig.io = pobj.out
wconfig.host = host
wconfig.tunnel = params[:tunnel]
wconfig.sshflags = sshflags
Expand Down Expand Up @@ -188,11 +188,11 @@ function launch(manager::LocalManager, params::Dict, launched::Array, c::Conditi
exeflags = params[:exeflags]

for i in 1:manager.np
io, pobj = open(detach(
pobj = open(detach(
setenv(`$(julia_cmd(exename)) $exeflags --bind-to $(LPROC.bind_addr) --worker`, dir=dir)), "r")
wconfig = WorkerConfig()
wconfig.process = pobj
wconfig.io = io
wconfig.io = pobj.out
push!(launched, wconfig)
end

Expand Down
4 changes: 2 additions & 2 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1269,8 +1269,8 @@ function launch_additional(np::Integer, cmd::Cmd)
addresses = cell(np)

for i in 1:np
io, pobj = open(detach(cmd), "r")
io_objs[i] = io
pobj = open(detach(cmd), "r")
io_objs[i] = pobj.out
end

for (i,io) in enumerate(io_objs)
Expand Down
30 changes: 18 additions & 12 deletions base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -465,46 +465,52 @@ eachline(cmd::AbstractCmd) = eachline(cmd, DevNull)

# return a Process object to read-to/write-from the pipeline
function open(cmds::AbstractCmd, mode::AbstractString="r", other::AsyncStream=DevNull)
if mode == "r"
if mode == "r+" || mode == "w+"
other === DevNull || throw(ArgumentError("no other stream for mode rw+"))
in = Pipe()
out = Pipe()
processes = spawn(cmds, (in,out,STDERR))
close(in.out)
close(out.in)
elseif mode == "r"
in = other
out = io = Pipe()
out = Pipe()
processes = spawn(cmds, (in,out,STDERR))
close(out.in)
elseif mode == "w"
in = io = Pipe()
in = Pipe()
out = other
processes = spawn(cmds, (in,out,STDERR))
close(in.out)
else
throw(ArgumentError("mode must be \"r\" or \"w\", not \"$mode\""))
end
return (io, processes)
return processes
end

function open(f::Function, cmds::AbstractCmd, args...)
io, P = open(cmds, args...)
P = open(cmds, args...)
ret = try
f(io)
f(P)
catch
kill(P)
rethrow()
finally
close(io)
close(P)
end
success(P) || pipeline_error(P)
return ret
end

# TODO: deprecate this
function readandwrite(cmds::AbstractCmd)
in = Pipe()
out, processes = open(cmds, "r", in)
(out, in, processes)
processes = open(cmds, "r+")
(processes.out, processes.in, processes)
end

function readbytes(cmd::AbstractCmd, stdin::AsyncStream=DevNull)
out, procs = open(cmd, "r", stdin)
bytes = readbytes(out)
procs = open(cmd, "r", stdin)
bytes = readbytes(procs.out)
!success(procs) && pipeline_error(procs)
return bytes
end
Expand Down
4 changes: 2 additions & 2 deletions examples/clustermanager/simple/UnixDomainCM.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ function launch(manager::UnixDomainCM, params::Dict, launched::Array, c::Conditi
sockname = tempname()
try
cmd = `$(params[:exename]) $(@__FILE__) udwrkr $sockname`
io, pobj = open(cmd, "r")
pobj = open(cmd, "r")

wconfig = WorkerConfig()
wconfig.userdata = Dict(:sockname=>sockname, :io=>io, :process=>pobj)
wconfig.userdata = Dict(:sockname=>sockname, :io=>pobj.out, :process=>pobj)
push!(launched, wconfig)
notify(c)
catch e
Expand Down
5 changes: 3 additions & 2 deletions test/examples.jl
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ include(joinpath(dir, "queens.jl"))
script = joinpath(dir, "clustermanager/simple/test_simple.jl")
cmd = `$(joinpath(JULIA_HOME,Base.julia_exename())) $script`

(strm, proc) = open(cmd)
proc = open(cmd)
errors = readall(proc)
wait(proc)
if !success(proc) && ccall(:jl_running_on_valgrind,Cint,()) == 0
println(readall(strm))
println(errors)
error("UnixDomainCM failed test, cmd : $cmd")
end
end
Expand Down
4 changes: 2 additions & 2 deletions test/spawn.jl
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,10 @@ sleep(1)
import Base.zzzInvalidIdentifier
"""
try
(in,p) = open(`$exename -f`, "w")
in = open(`$exename -f`, "w")
write(in,cmd)
close(in)
wait(p)
wait(in)
catch
error("IOStream redirect failed. Child stderr was \n$(readall(fname))\n")
end
Expand Down

0 comments on commit 955fab4

Please sign in to comment.