/
selector.rb
101 lines (85 loc) · 2.92 KB
/
selector.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
require "agent/uuid"
require "agent/push"
require "agent/pop"
require "agent/channel"
require "agent/notifier"
require "agent/errors"
module Agent
def self.select!
raise BlockMissing unless block_given?
selector = Agent::Selector.new
yield selector
selector.select
ensure
selector && selector.dequeue_unrunnable_operations
end
class Selector
attr_reader :cases
class DefaultCaseAlreadyDefinedError < Exception; end
Case = Struct.new(:uuid, :channel, :direction, :value, :blk)
def initialize
@ordered_cases = []
@cases = {}
@operations = {}
@once = Once.new
@notifier = Notifier.new
end
def default(&blk)
if @default_case
@default_case.channel.close
raise DefaultCaseAlreadyDefinedError
else
@default_case = self.case(channel!(:type => TrueClass), :receive, &blk)
end
end
def timeout(t, &blk)
s = channel!(:type => TrueClass)
go!{ sleep t; s.send(true); s.close }
self.case(s, :receive, &blk)
end
def case(chan, direction, value=nil, &blk)
raise "invalid case, must be a channel" unless chan.is_a?(Agent::Channel)
raise BlockMissing unless blk
uuid = Agent::UUID.generate
cse = Case.new(uuid, chan, direction, value, blk)
@ordered_cases << cse
@cases[uuid] = cse
@operations[chan] = []
cse
end
def select
if !@ordered_cases.empty?
options = {:once => @once, :notifier => @notifier, :deferred => true}
@ordered_cases.each do |cse|
if cse.direction == :send
@operations[cse.channel] << cse.channel.send(cse.value, :uuid => cse.uuid,
:once => @once,
:notifier => @notifier,
:deferred => true)
else # :receive
@operations[cse.channel] << cse.channel.receive(:uuid => cse.uuid,
:once => @once,
:notifier => @notifier,
:deferred => true)
end
end
if @default_case
@default_case.channel.send(true, :uuid => @default_case.uuid, :once => @once, :notifier => @notifier, :deferred => true)
end
@notifier.wait
operation = @notifier.payload
if operation.is_a?(Push)
@cases[operation.uuid].blk.call
else # Pop
@cases[operation.uuid].blk.call(operation.object)
end
@default_case.channel.close if @default_case
end
end
def dequeue_unrunnable_operations
@operations.each do |channel, operations|
channel.remove_operations(operations)
end
end
end
end