/
innertube.rb
188 lines (169 loc) · 5.4 KB
/
innertube.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
require 'thread'
# Innertube is a re-entrant thread-safe resource pool that was
# extracted from the Riak Ruby Client
# (https://github.com/basho/riak-ruby-client).
# @see Pool
module Innertube
# A re-entrant thread-safe resource pool that generates new resources on
# demand.
# @private
class Pool
# Raised when a taken element should be deleted from the pool.
class BadResource < RuntimeError; end
# An element of the pool. Comprises an object with an owning
# thread. Not usually needed by user code, and should not be
# modified outside the {Pool}'s lock.
class Element
attr_reader :object, :owner
# Creates a pool element
# @param [Object] object the resource to wrap into the pool element
def initialize(object)
@object = object
@owner = nil
end
# Claims this element of the pool for the current Thread.
# Do not call this manually, it is only used from inside the pool.
def lock
@owner = Thread.current
end
# @return [true,false] Is this element locked/claimed?
def locked?
!unlocked?
end
# Releases this element of the pool from the current Thread.
def unlock
@owner = nil
end
# @return [true,false] Is this element available for use?
def unlocked?
owner.nil?
end
end
# Creates a new resource pool.
# @param [Proc, #call] open a callable which allocates a new object for the
# pool
# @param [Proc, #call] close a callable which is called with an
# object before it is freed.
def initialize(open, close)
@open = open
@close = close
@lock = Mutex.new
@iterator = Mutex.new
@element_released = ConditionVariable.new
@pool = Set.new
end
# On each element of the pool, calls close(element) and removes it.
# @private
def clear
each_element do |e|
delete_element e
end
end
alias :close :clear
# Deletes an element of the pool. Calls the close callback on its object.
# Not intended for external use.
# @param [Element] e the element to remove from the pool
def delete_element(e)
@close.call(e.object)
@lock.synchronize do
@pool.delete e
end
end
private :delete_element
# Locks each element in turn and closes/deletes elements for which the
# object passes the block.
# @yield [object] a block that should determine whether an element
# should be deleted from the pool
# @yieldparam [Object] object the resource
def delete_if
raise ArgumentError, "block required" unless block_given?
each_element do |e|
if yield e.object
delete_element e
end
end
end
# Acquire an element of the pool. Yields the object. If all
# elements are claimed, it will create another one.
# @yield [resource] a block that will perform some action with the
# element of the pool
# @yieldparam [Object] resource a resource managed by the pool.
# Locked for the duration of the block
# @param [Proc, #call] :filter a callable which receives objects and has
# the opportunity to reject each in turn.
# @param [Object] :default if no resources are available, use this object
# instead of calling #open.
# @private
def take(opts = {})
raise ArgumentError, "block required" unless block_given?
result = nil
element = nil
opts[:filter] ||= proc {|_| true }
@lock.synchronize do
element = @pool.find { |e| e.unlocked? && opts[:filter].call(e.object) }
unless element
# No objects were acceptable
resource = opts[:default] || @open.call
element = Element.new(resource)
@pool << element
end
element.lock
end
begin
result = yield element.object
rescue BadResource
delete_element element
raise
ensure
# Unlock
if element
element.unlock
@element_released.signal
end
end
result
end
alias >> take
# Iterate over a snapshot of the pool. Yielded objects are locked
# for the duration of the block. This may block the current thread
# until elements in the snapshot are released by other threads.
# @yield [element] a block that will do something with each
# element in the pool
# @yieldparam [Element] element the current element in the
# iteration
def each_element
targets = @pool.to_a
unlocked = []
@iterator.synchronize do
until targets.empty?
@lock.synchronize do
unlocked, targets = targets.partition {|e| e.unlocked? }
unlocked.each {|e| e.lock }
end
unlocked.each do |e|
begin
yield e
ensure
e.unlock
end
end
@element_released.wait(@iterator) unless targets.empty?
end
end
end
# As each_element, but yields objects, not wrapper elements.
# @yield [resource] a block that will do something with each
# resource in the pool
# @yieldparam [Object] resource the current resource in the
# iteration
def each
each_element do |e|
yield e.object
end
end
# @return [Integer] the number of the resources in the pool
def size
@lock.synchronize { @pool.size }
end
end
end