Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for receive timeouts #217

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion src/comm.jl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,29 @@ function _recv!(socket::Socket, zmsg)
if -1 == msg_recv(socket, zmsg, ZMQ_DONTWAIT)
zmq_errno() == EAGAIN || throw(StateError(jl_zmq_error_str()))
while socket.events & POLLIN== 0
wait(socket)
# If there is no receive timeout, just wait on the socket
if socket.rcvtimeo == -1
wait(socket)
else
# Otherwise, implement a receive timeout for the wait. We
# can't rely on ZMQ's native blocking behaviour because that
# doesn't play nicely with Julia's tasks.
timeout_secs = socket.rcvtimeo / 1000
result_chnl = Channel()

# One of these tasks will write to result_chnl first, at
# which point the channel will be closed and the other task
# terminated.
zmq_task = @async (wait(socket); put!(result_chnl, :ok))
timeout_task = @async (sleep(timeout_secs); put!(result_chnl, :timed_out))
bind(result_chnl, zmq_task)
bind(result_chnl, timeout_task)

# If there was a timeout throw an error, otherwise continue looping
if take!(result_chnl) == :timed_out
error("ZMQ receive timed out")
end
end
end
else
notify_is_expensive = !isempty(getfield(socket,:pollfd).notify.waitq)
Expand Down
27 changes: 27 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,33 @@ end
ZMQ.send(s1, "another test response")
wait(c)

# Set s1's receive timeout to 0.5s, and check that it throws when there are
# no incoming messages.
s1.rcvtimeo = 500
recv_timeout_elapsed = @elapsed @test_throws ErrorException ZMQ.recv(s1)
@test recv_timeout_elapsed >= s1.rcvtimeo / 1000

# Test that the receive timeout functionality yields and doesn't block
c = Base.Condition()
msg_sent = false
# Set the receive timeout to something large
s1.rcvtimeo = 10_000
@async begin
global msg_sent
sleep(0.5)
msg_sent = true
ZMQ.send(s2, "foo request")
@test ZMQ.recv(s2, String) == "bar response"
notify(c)
end

@test ZMQ.recv(s1, String) == "foo request"
@test msg_sent == true
ZMQ.send(s1, "bar response")
wait(c)
# Reset the timeout for the rest of the tests
s1.rcvtimeo = -1

ZMQ.send(s2, Message("another test request"))
msg = ZMQ.recv(s1)
o=convert(IOStream, msg)
Expand Down