-
Notifications
You must be signed in to change notification settings - Fork 25
/
reactor.rb
93 lines (80 loc) · 2.41 KB
/
reactor.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
module Celluloid
module ZMQ
# React to incoming 0MQ and Celluloid events. This is kinda sorta supposed
# to resemble the Reactor design pattern.
class Reactor
extend Forwardable
def_delegator :@waker, :signal, :wakeup
def_delegator :@waker, :cleanup, :shutdown
def initialize
@waker = Waker.new
@poller = ::CZTop::Poller::Aggregated.new
@readers = {}
@writers = {}
@poller.add_reader(@waker.socket)
end
# Wait for the given ZMQ socket to become readable
def wait_readable(socket)
monitor_zmq socket, @readers, :read
end
# Wait for the given ZMQ socket to become writable
def wait_writable(socket)
monitor_zmq socket, @writers, :write
end
# Monitor the given ZMQ socket with the given options
def monitor_zmq(socket, set, type)
if set.key? socket
fail ArgumentError, "another method is already waiting on #{socket.inspect}"
else
set[socket] = Task.current
end
case type
when :read
@poller.add_reader(socket)
when :write
@poller.add_writer(socket)
else
raise ArgumentError, "wrong type: #{type.inspect}"
end
Task.suspend :zmqwait
socket
end
# Run the reactor, waiting for events, and calling the given block if
# the reactor is awoken by the waker
def run_once(timeout = nil)
if timeout
timeout *= 1000 # Poller uses millisecond increments
else
timeout = 0 # blocking
end
begin
@poller.wait(timeout)
rescue
raise IOError, "ZMQ poll error: #{$!.message}"
end
@poller.readables.each do |sock|
if sock == @waker.socket
@waker.wait
else
task = @readers.delete sock
@poller.remove_reader(sock)
if task
task.resume
else
Celluloid::Logger.debug "ZMQ error: got read event without associated reader"
end
end
end
@poller.writables.each do |sock|
task = @writers.delete sock
@poller.remove_writer(sock)
if task
task.resume
else
Celluloid::Logger.debug "ZMQ error: got write event without associated writer"
end
end
end
end
end
end