-
Notifications
You must be signed in to change notification settings - Fork 31
/
manager.rb
129 lines (108 loc) · 3 KB
/
manager.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# frozen_string_literal: true
require "faktory/util"
require "faktory/processor"
require "faktory/fetch"
require "set"
module Faktory
##
# The Manager is the central coordination point in Faktory, controlling
# the lifecycle of the Processors.
#
# Tasks:
#
# 1. start: Spin up Processors.
# 3. processor_died: Handle job failure, throw away Processor, create new one.
# 4. quiet: shutdown idle Processors.
# 5. stop: hard stop the Processors by deadline.
#
# Note that only the last task requires its own Thread since it has to monitor
# the shutdown process. The other tasks are performed by other threads.
#
class Manager
include Util
attr_reader :threads
attr_reader :options
def initialize(options = {})
logger.debug { options.inspect }
@options = options
@count = options[:concurrency] || 25
raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1
@done = false
@threads = Set.new
@count.times do
@threads << Processor.new(self)
end
@plock = Mutex.new
end
def start
@threads.each do |x|
x.start
end
end
def quiet
return if @done
@done = true
logger.info { "Terminating quiet threads" }
@threads.each { |x| x.terminate }
fire_event(:quiet, true)
end
# hack for quicker development / testing environment
PAUSE_TIME = $stdout.tty? ? 0.1 : 0.5
def stop(deadline)
quiet
fire_event(:shutdown, true)
# some of the shutdown events can be async,
# we don't have any way to know when they're done but
# give them a little time to take effect
sleep PAUSE_TIME
return if @threads.empty?
logger.info { "Pausing to allow threads to finish..." }
remaining = deadline - Time.now
while remaining > PAUSE_TIME
return if @threads.empty?
sleep PAUSE_TIME
remaining = deadline - Time.now
end
return if @threads.empty?
hard_shutdown
end
def processor_stopped(processor)
@plock.synchronize do
@threads.delete(processor)
end
end
def processor_died(processor, reason)
@plock.synchronize do
@threads.delete(processor)
unless @done
p = Processor.new(self)
@threads << p
p.start
end
end
end
def stopped?
@done
end
private
def hard_shutdown
# We've reached the timeout and we still have busy threads.
# They must die but their jobs shall live on.
cleanup = nil
@plock.synchronize do
cleanup = @threads.dup
end
if cleanup.size > 0
jobs = cleanup.map { |p| p.job }.compact
logger.warn { "Terminating #{cleanup.size} busy worker threads" }
logger.warn { "Work still in progress #{jobs.inspect}" }
end
cleanup.each do |processor|
processor.kill
end
cleanup.each do |processor|
processor.thread.join(1)
end
end
end
end