forked from puma/puma
-
Notifications
You must be signed in to change notification settings - Fork 0
/
thread_pool.rb
394 lines (329 loc) · 10.9 KB
/
thread_pool.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
# frozen_string_literal: true
require 'thread'
module Puma
# Internal Docs for A simple thread pool management object.
#
# Each Puma "worker" has a thread pool to process requests.
#
# First a connection to a client is made in `Puma::Server`. It is wrapped in a
# `Puma::Client` instance and then passed to the `Puma::Reactor` to ensure
# the whole request is buffered into memory. Once the request is ready, it is passed into
# a thread pool via the `Puma::ThreadPool#<<` operator where it is stored in a `@todo` array.
#
# Each thread in the pool has an internal loop where it pulls a request from the `@todo` array
# and proceses it.
class ThreadPool
class ForceShutdown < RuntimeError
end
# How long, after raising the ForceShutdown of a thread during
# forced shutdown mode, to wait for the thread to try and finish
# up its work before leaving the thread to die on the vine.
SHUTDOWN_GRACE_TIME = 5 # seconds
# Maintain a minimum of +min+ and maximum of +max+ threads
# in the pool.
#
# The block passed is the work that will be performed in each
# thread.
#
def initialize(min, max, *extra, &block)
@not_empty = ConditionVariable.new
@not_full = ConditionVariable.new
@mutex = Mutex.new
@todo = []
@spawned = 0
@waiting = 0
@min = Integer(min)
@max = Integer(max)
@block = block
@extra = extra
@shutdown = false
@trim_requested = 0
@out_of_band_pending = false
@workers = []
@auto_trim = nil
@reaper = nil
@mutex.synchronize do
@min.times do
spawn_thread
@not_full.wait(@mutex)
end
end
@clean_thread_locals = false
@force_shutdown = false
@shutdown_mutex = Mutex.new
end
attr_reader :spawned, :trim_requested, :waiting
attr_accessor :clean_thread_locals
attr_accessor :out_of_band_hook # @version 5.0.0
def self.clean_thread_locals
Thread.current.keys.each do |key| # rubocop: disable Performance/HashEachMethods
Thread.current[key] = nil unless key == :__recursive_key__
end
end
# How many objects have yet to be processed by the pool?
#
def backlog
with_mutex { @todo.size }
end
# @!attribute [r] pool_capacity
def pool_capacity
waiting + (@max - spawned)
end
# @!attribute [r] busy_threads
# @version 5.0.0
def busy_threads
with_mutex { @spawned - @waiting + @todo.size }
end
# :nodoc:
#
# Must be called with @mutex held!
#
def spawn_thread
@spawned += 1
th = Thread.new(@spawned) do |spawned|
Puma.set_thread_name 'threadpool %03i' % spawned
todo = @todo
block = @block
mutex = @mutex
not_empty = @not_empty
not_full = @not_full
extra = @extra.map { |i| i.new }
while true
work = nil
mutex.synchronize do
while todo.empty?
if @trim_requested > 0
@trim_requested -= 1
@spawned -= 1
@workers.delete th
Thread.exit
end
@waiting += 1
if @out_of_band_pending && trigger_out_of_band_hook
@out_of_band_pending = false
end
not_full.signal
begin
not_empty.wait mutex
ensure
@waiting -= 1
end
end
work = todo.shift
end
if @clean_thread_locals
ThreadPool.clean_thread_locals
end
begin
@out_of_band_pending = true if block.call(work, *extra)
rescue Exception => e
STDERR.puts "Error reached top of thread-pool: #{e.message} (#{e.class})"
end
end
end
@workers << th
th
end
private :spawn_thread
# @version 5.0.0
def trigger_out_of_band_hook
return false unless out_of_band_hook && out_of_band_hook.any?
# we execute on idle hook when all threads are free
return false unless @spawned == @waiting
out_of_band_hook.each(&:call)
true
rescue Exception => e
STDERR.puts "Exception calling out_of_band_hook: #{e.message} (#{e.class})"
true
end
private :trigger_out_of_band_hook
# @version 5.0.0
def with_mutex(&block)
@mutex.owned? ?
yield :
@mutex.synchronize(&block)
end
# Add +work+ to the todo list for a Thread to pickup and process.
def <<(work)
with_mutex do
if @shutdown
raise "Unable to add work while shutting down"
end
@todo << work
if @waiting < @todo.size and @spawned < @max
spawn_thread
end
@not_empty.signal
end
end
# This method is used by `Puma::Server` to let the server know when
# the thread pool can pull more requests from the socket and
# pass to the reactor.
#
# The general idea is that the thread pool can only work on a fixed
# number of requests at the same time. If it is already processing that
# number of requests then it is at capacity. If another Puma process has
# spare capacity, then the request can be left on the socket so the other
# worker can pick it up and process it.
#
# For example: if there are 5 threads, but only 4 working on
# requests, this method will not wait and the `Puma::Server`
# can pull a request right away.
#
# If there are 5 threads and all 5 of them are busy, then it will
# pause here, and wait until the `not_full` condition variable is
# signaled, usually this indicates that a request has been processed.
#
# It's important to note that even though the server might accept another
# request, it might not be added to the `@todo` array right away.
# For example if a slow client has only sent a header, but not a body
# then the `@todo` array would stay the same size as the reactor works
# to try to buffer the request. In that scenario the next call to this
# method would not block and another request would be added into the reactor
# by the server. This would continue until a fully bufferend request
# makes it through the reactor and can then be processed by the thread pool.
def wait_until_not_full
with_mutex do
while true
return if @shutdown
# If we can still spin up new threads and there
# is work queued that cannot be handled by waiting
# threads, then accept more work until we would
# spin up the max number of threads.
return if busy_threads < @max
@not_full.wait @mutex
end
end
end
# @version 5.0.0
def wait_for_less_busy_worker(delay_s)
return unless delay_s && delay_s > 0
# Ruby MRI does GVL, this can result
# in processing contention when multiple threads
# (requests) are running concurrently
return unless Puma.mri?
with_mutex do
return if @shutdown
# do not delay, if we are not busy
return unless busy_threads > 0
# this will be signaled once a request finishes,
# which can happen earlier than delay
@not_full.wait @mutex, delay_s
end
end
# If there are any free threads in the pool, tell one to go ahead
# and exit. If +force+ is true, then a trim request is requested
# even if all threads are being utilized.
#
def trim(force=false)
with_mutex do
free = @waiting - @todo.size
if (force or free > 0) and @spawned - @trim_requested > @min
@trim_requested += 1
@not_empty.signal
end
end
end
# If there are dead threads in the pool make them go away while decreasing
# spawned counter so that new healthy threads could be created again.
def reap
with_mutex do
dead_workers = @workers.reject(&:alive?)
dead_workers.each do |worker|
worker.kill
@spawned -= 1
end
@workers.delete_if do |w|
dead_workers.include?(w)
end
end
end
class Automaton
def initialize(pool, timeout, thread_name, message)
@pool = pool
@timeout = timeout
@thread_name = thread_name
@message = message
@running = false
end
def start!
@running = true
@thread = Thread.new do
Puma.set_thread_name @thread_name
while @running
@pool.public_send(@message)
sleep @timeout
end
end
end
def stop
@running = false
@thread.wakeup
end
end
def auto_trim!(timeout=30)
@auto_trim = Automaton.new(self, timeout, "threadpool trimmer", :trim)
@auto_trim.start!
end
def auto_reap!(timeout=5)
@reaper = Automaton.new(self, timeout, "threadpool reaper", :reap)
@reaper.start!
end
# Allows ThreadPool::ForceShutdown to be raised within the
# provided block if the thread is forced to shutdown during execution.
def with_force_shutdown
t = Thread.current
@shutdown_mutex.synchronize do
raise ForceShutdown if @force_shutdown
t[:with_force_shutdown] = true
end
yield
ensure
t[:with_force_shutdown] = false
end
# Tell all threads in the pool to exit and wait for them to finish.
# Wait +timeout+ seconds then raise +ForceShutdown+ in remaining threads.
# Next, wait an extra +grace+ seconds then force-kill remaining threads.
# Finally, wait +kill_grace+ seconds for remaining threads to exit.
#
def shutdown(timeout=-1)
threads = with_mutex do
@shutdown = true
@trim_requested = @spawned
@not_empty.broadcast
@not_full.broadcast
@auto_trim.stop if @auto_trim
@reaper.stop if @reaper
# dup workers so that we join them all safely
@workers.dup
end
if timeout == -1
# Wait for threads to finish without force shutdown.
threads.each(&:join)
else
join = ->(inner_timeout) do
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
threads.reject! do |t|
elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start
t.join inner_timeout - elapsed
end
end
# Wait +timeout+ seconds for threads to finish.
join.call(timeout)
# If threads are still running, raise ForceShutdown and wait to finish.
@shutdown_mutex.synchronize do
@force_shutdown = true
threads.each do |t|
t.raise ForceShutdown if t[:with_force_shutdown]
end
end
join.call(SHUTDOWN_GRACE_TIME)
# If threads are _still_ running, forcefully kill them and wait to finish.
threads.each(&:kill)
join.call(1)
end
@spawned = 0
@workers = []
end
end
end