-
Notifications
You must be signed in to change notification settings - Fork 31
/
processor.rb
175 lines (158 loc) · 4.46 KB
/
processor.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# frozen_string_literal: true
require "faktory/util"
require "faktory/fetch"
require "faktory/job_logger"
module Faktory
##
# The Processor is a standalone thread which:
#
# 1. fetches a job
# 2. executes the job
# a. instantiate the Worker
# b. run the middleware chain
# c. call #perform
#
# A Processor can exit due to shutdown (processor_stopped)
# or due to an error during job execution (processor_died)
#
# If an error occurs in the job execution, the
# Processor calls the Manager to create a new one
# to replace itself and exits.
#
class Processor
include Util
attr_reader :thread
attr_reader :job
@@busy_lock = Mutex.new
@@busy_count = 0
def self.busy_count
@@busy_count
end
def initialize(mgr)
@mgr = mgr
@down = false
@done = false
@thread = nil
@reloader = mgr.options[:reloader]
@logging = (mgr.options[:job_logger] || Faktory::JobLogger).new
@fetcher = Faktory::Fetcher.new(mgr.options)
end
def terminate(wait = false)
@done = true
return if !@thread
@thread.value if wait
end
def kill(wait = false)
@done = true
return if !@thread
# unlike the other actors, terminate does not wait
# for the thread to finish because we don't know how
# long the job will take to finish. Instead we
# provide a `kill` method to call after the shutdown
# timeout passes.
@thread.raise ::Faktory::Shutdown
@thread.value if wait
end
def start
@thread ||= safe_thread("processor", &method(:run))
end
private unless $TESTING
def run
until @done
process_one
end
@mgr.processor_stopped(self)
rescue Faktory::Shutdown
@mgr.processor_stopped(self)
rescue Exception => ex # rubocop:disable Lint/RescueException
@mgr.processor_died(self, ex)
end
def process_one
work = fetch
if work
@@busy_lock.synchronize do
@@busy_count += 1
end
begin
@job = work.job
process(work)
ensure
@@busy_lock.synchronize do
@@busy_count -= 1
end
end
else
sleep 1
end
end
def fetch
work = @fetcher.retrieve_work
if @down
(logger.info { "Faktory is online, #{Time.now - @down} sec downtime" }
@down = nil)
end
work
rescue Faktory::Shutdown
rescue => ex
handle_fetch_exception(ex)
end
def handle_fetch_exception(ex)
if !@down
@down = Time.now
logger.error("Error fetching job: #{ex}")
ex.backtrace.each do |bt|
logger.error(bt)
end
end
sleep(1)
nil
end
def dispatch(payload)
Faktory::Logging.with_job_hash_context(payload) do
@logging.call(payload) do
# Rails 5 requires a Reloader to wrap code execution. In order to
# constantize the worker and instantiate an instance, we have to call
# the Reloader. It handles code loading, db connection management, etc.
# Effectively this block denotes a "unit of work" to Rails.
@reloader.call do
klass = constantize(payload["jobtype"])
jobinst = klass.new
jobinst.jid = payload["jid"]
jobinst._custom = payload["custom"]
yield jobinst
end
end
end
end
def process(work)
payload = work.job
begin
dispatch(payload) do |jobinst|
Faktory.worker_middleware.invoke(jobinst, payload) do
jobinst.perform(*payload["args"])
end
end
work.acknowledge
rescue Faktory::Shutdown => shut
# Had to force kill this job because it didn't finish within
# the timeout. Fail it so we can release any locks server-side
# and immediately restart it.
work.fail(shut)
rescue Exception => ex # rubocop:disable Lint/RescueException
handle_exception(ex, {context: "Job raised exception", job: work.job})
work.fail(ex)
raise ex
end
end
def thread_identity
@str ||= Thread.current.object_id.to_s(36)
end
def constantize(str)
names = str.split("::")
names.shift if names.empty? || names.first.empty?
names.inject(Object) do |constant, name|
constant.const_defined?(name) ? constant.const_get(name) : constant.const_missing(name)
end
end
end
end