Skip to content

Commit

Permalink
Alternative fix for #3567
Browse files Browse the repository at this point in the history
As discussed in #5825, this puts handles that should be preserved into a global ObjectIdDict. The only slightly sketchy parts are the idle and async worker invocations which I'll have to think about (we don't have a good interface for them yet, really), but it's not as simple as just putting a wait() based interface on top, since people are using them for synchronization with threaded C libraries.
  • Loading branch information
Keno committed Feb 22, 2014
1 parent 0b2d867 commit 933487c
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 78 deletions.
12 changes: 6 additions & 6 deletions base/poll.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type FileMonitor
end
this = new(handle,cb,false,Condition())
associate_julia_struct(handle,this)
finalizer(this,close)
finalizer(this,uvfinalize)
this
end
FileMonitor(file) = FileMonitor(false,file)
Expand Down Expand Up @@ -80,7 +80,7 @@ type PollingFileWatcher <: UVPollingWatcher
end
this = new(handle, file, false, Condition(), cb)
associate_julia_struct(handle,this)
finalizer(this,close)
finalizer(this,uvfinalize)
this
end
PollingFileWatcher(file) = PollingFileWatcher(false,file)
Expand Down Expand Up @@ -116,7 +116,7 @@ function FDWatcher(fd::RawFD)
end
this = FDWatcher(handle,fd,false,Condition(),false,FDEvent())
associate_julia_struct(handle,this)
finalizer(this,close)
finalizer(this,uvfinalize)
this
end
@windows_only function FDWatcher(fd::WindowsRawSocket)
Expand All @@ -129,7 +129,7 @@ end
end
this = FDWatcher(handle,fd,false,Condition(),false,FDEvent())
associate_julia_struct(handle,this)
finalizer(this,close)
finalizer(this,uvfinalize)
this
end

Expand Down Expand Up @@ -220,15 +220,15 @@ function wait(pfw::PollingFileWatcher; interval=2.0)
if !pfw.open
start_watching(pfw_wait_cb,pfw,interval)
end
prev,curr = wait(pfw.notify)
prev,curr = stream_wait(pfw,pfw.notify)
if isempty(pfw.notify.waitq)
stop_watching(pfw)
end
(prev,curr)
end

function wait(m::FileMonitor)
err, filename, events = wait(m.notify)
err, filename, events = stream_wait(m,m.notify)
filename, events
end

Expand Down
2 changes: 1 addition & 1 deletion base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ macro cmd(str)
:(cmd_gen($(shell_parse(str)[1])))
end

wait(x::Process) = if !process_exited(x); wait(x.exitnotify); end
wait(x::Process) = if !process_exited(x); stream_wait(x,x.exitnotify); end
wait(x::ProcessChain) = for p in x.processes; wait(p); end

show(io::IO, p::Process) = print(io, "Process(", p.cmd, ", ", process_status(p), ")")
24 changes: 21 additions & 3 deletions base/socket.jl
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ type TcpSocket <: Socket
end
function TcpSocket()
this = TcpSocket(c_malloc(_sizeof_uv_tcp))
associate_julia_struct(this.handle, this)
associate_julia_struct(this.handle,this)
finalizer(this,uvfinalize)
err = ccall(:uv_tcp_init,Cint,(Ptr{Void},Ptr{Void}),
eventloop(),this.handle)
if err != 0
Expand Down Expand Up @@ -289,6 +290,7 @@ end
function TcpServer()
this = TcpServer(c_malloc(_sizeof_uv_tcp))
associate_julia_struct(this.handle, this)
finalizer(this,uvfinalize)
err = ccall(:uv_tcp_init,Cint,(Ptr{Void},Ptr{Void}),
eventloop(),this.handle)
if err != 0
Expand All @@ -300,6 +302,22 @@ function TcpServer()
this
end

# Internal version of close that doesn't error when called on an unitialized socket, as well as disassociating the socket immidiately
# This is fine because if we're calling this from a finalizer, nobody can be possibly waiting for the close to go through
function uvfinalize(uv)
close(uv)
disassociate_julia_struct(uv)
uv.handle = 0
end

function uvfinalize(uv::Union(TTY,Pipe,TcpServer,TcpSocket))
if (uv.status != StatusUninit && uv.status != StatusInit)
close(uv)
end
disassociate_julia_struct(uv)
uv.handle = 0
end

isreadable(io::TcpSocket) = true
iswritable(io::TcpSocket) = true

Expand Down Expand Up @@ -435,7 +453,7 @@ function recv(sock::UdpSocket)
error("Invalid socket state")
end
_recv_start(sock)
wait(sock.recvnotify)::Vector{Uint8}
stream_wait(sock,sock.recvnotify)::Vector{Uint8}
end

function _uv_hook_recv(sock::UdpSocket, nread::Ptr{Void}, buf_addr::Ptr{Void}, buf_size::Int32, addr::Ptr{Void}, flags::Int32)
Expand Down Expand Up @@ -463,7 +481,7 @@ function send(sock::UdpSocket,ipaddr,port,msg)
error("Invalid socket state")
end
uv_error("send",_send(sock,ipaddr,uint16(port),msg))
wait(sock.sendnotify)
stream_wait(sock,sock.sendnotify)
nothing
end

Expand Down
Loading

0 comments on commit 933487c

Please sign in to comment.