This repository has been archived by the owner on Nov 15, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
queue.rb
90 lines (80 loc) · 1.66 KB
/
queue.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class Yoleaux
class Queue
def self.select r, w=[], e=[], timeout=nil
rmaps = {}
wmaps = {}
emaps = {}
rios = r.map do |q|
if q.is_a? Queue
rmaps[q.reader] = q
q.reader
else
q
end
end
wios = w.map do |q|
if q.is_a? Queue
wmaps[q.writer] = q
q.writer
else
q
end
end
eios = e.map do |q|
if q.is_a? Queue
emaps[q.writer] = q
emaps[q.reader] = q
[q.reader, q.writer]
else
q
end
end.flatten
result = IO.select(rios, wios, eios, timeout)
return nil if result.nil?
result[0].map! {|io| rmaps[io] or io }
result[1].map! {|io| wmaps[io] or io }
result[2].map! {|io| emaps[io] or io }
result
end
def initialize
@read, @write = IO.pipe
end
def send obj
rep = Marshal.dump obj
@write.write rep
end
def read
Marshal.load @read
end
alias receive read
alias recv read
def send_nonblock obj
IO.select([], [@write], [], 0) and send obj
end
def read_nonblock
IO.select([@read], [], [], 0) and send obj
end
alias receive_nonblock read_nonblock
alias recv_nonblock read
def each &block
until @read.eof?
block.call receive
end
end
def close_read
@read.close
end
def close_write
@write.close
end
def reader
@read
end
def writer
@write
end
def inspect
"#<#{self.class} fd:#{@read.inspect},#{@write.inspect}>"
end
end
end