/
notifier.rb
184 lines (156 loc) · 6.33 KB
/
notifier.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# frozen_string_literal: true
require 'concurrent/atomic/atomic_boolean'
module GoodJob # :nodoc:
#
# Notifiers hook into Postgres LISTEN/NOTIFY functionality to emit and listen for notifications across processes.
#
# Notifiers can emit NOTIFY messages through Postgres.
# A notifier will LISTEN for messages by creating a background thread that runs in an instance of +Concurrent::ThreadPoolExecutor+.
# When a message is received, the notifier passes the message to each of its recipients.
#
class Notifier
# Raised if the Database adapter does not implement LISTEN.
AdapterCannotListenError = Class.new(StandardError)
# Default Postgres channel for LISTEN/NOTIFY
CHANNEL = 'good_job'
# Defaults for instance of Concurrent::ThreadPoolExecutor
EXECUTOR_OPTIONS = {
name: name,
min_threads: 0,
max_threads: 1,
auto_terminate: true,
idletime: 60,
max_queue: 1,
fallback_policy: :discard,
}.freeze
# Seconds to wait if database cannot be connected to
RECONNECT_INTERVAL = 5
# Seconds to block while LISTENing for a message
WAIT_INTERVAL = 1
# @!attribute [r] instances
# @!scope class
# List of all instantiated Notifiers in the current process.
# @return [Array<GoodJob::Notifier>, nil]
cattr_reader :instances, default: [], instance_reader: false
# Send a message via Postgres NOTIFY
# @param message [#to_json]
def self.notify(message)
connection = Job.connection
connection.exec_query <<~SQL.squish
NOTIFY #{CHANNEL}, #{connection.quote(message.to_json)}
SQL
end
# List of recipients that will receive notifications.
# @return [Array<#call, Array(Object, Symbol)>]
attr_reader :recipients
# @param recipients [Array<#call, Array(Object, Symbol)>]
def initialize(*recipients)
@recipients = Concurrent::Array.new(recipients)
@listening = Concurrent::AtomicBoolean.new(false)
self.class.instances << self
create_executor
listen
end
# Tests whether the notifier is active and listening for new messages.
# @return [true, false, nil]
def listening?
@listening.true?
end
# Tests whether the notifier is running.
# @!method running?
# @return [true, false, nil]
delegate :running?, to: :executor, allow_nil: true
# Tests whether the scheduler is shutdown.
# @!method shutdown?
# @return [true, false, nil]
delegate :shutdown?, to: :executor, allow_nil: true
# Shut down the notifier.
# This stops the background LISTENing thread.
# Use {#shutdown?} to determine whether threads have stopped.
# @param timeout [Numeric, nil] Seconds to wait for active threads.
# * +nil+, the scheduler will trigger a shutdown but not wait for it to complete.
# * +-1+, the scheduler will wait until the shutdown is complete.
# * +0+, the scheduler will immediately shutdown and stop any threads.
# * A positive number will wait that many seconds before stopping any remaining active threads.
# @return [void]
def shutdown(timeout: -1)
return if executor.nil? || executor.shutdown?
executor.shutdown if executor.running?
if executor.shuttingdown? && timeout # rubocop:disable Style/GuardClause
executor_wait = timeout.negative? ? nil : timeout
executor.kill unless executor.wait_for_termination(executor_wait)
end
end
# Restart the notifier.
# When shutdown, start; or shutdown and start.
# @param timeout [nil, Numeric] Seconds to wait; shares same values as {#shutdown}.
# @return [void]
def restart(timeout: -1)
shutdown(timeout: timeout) if running?
create_executor
listen
end
# Invoked on completion of ThreadPoolExecutor task
# @!visibility private
# @return [void]
def listen_observer(_time, _result, thread_error)
return if thread_error.is_a? AdapterCannotListenError
if thread_error
GoodJob.on_thread_error.call(thread_error) if GoodJob.on_thread_error.respond_to?(:call)
ActiveSupport::Notifications.instrument("notifier_notify_error.good_job", { error: thread_error })
end
return if shutdown?
if thread_error.is_a?(ActiveRecord::ConnectionNotEstablished) || thread_error.is_a?(ActiveRecord::StatementInvalid)
listen(delay: RECONNECT_INTERVAL)
else
listen
end
end
private
attr_reader :executor
def create_executor
@executor = Concurrent::ThreadPoolExecutor.new(EXECUTOR_OPTIONS)
end
def listen(delay: 0)
future = Concurrent::ScheduledTask.new(delay, args: [@recipients, executor, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_listening|
with_listen_connection do |conn|
ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
conn.async_exec("LISTEN #{CHANNEL}").clear
end
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
thr_listening.make_true
while thr_executor.running?
conn.wait_for_notify(WAIT_INTERVAL) do |channel, _pid, payload|
next unless channel == CHANNEL
ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload })
parsed_payload = JSON.parse(payload, symbolize_names: true)
thr_recipients.each do |recipient|
target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call]
target.send(method_name, parsed_payload)
end
end
end
end
ensure
thr_listening.make_false
ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
conn.async_exec("UNLISTEN *").clear
end
end
end
future.add_observer(self, :listen_observer)
future.execute
end
def with_listen_connection
ar_conn = Job.connection_pool.checkout.tap do |conn|
Job.connection_pool.remove(conn)
end
pg_conn = ar_conn.raw_connection
raise AdapterCannotListenError unless pg_conn.respond_to? :wait_for_notify
pg_conn.async_exec("SET application_name = #{pg_conn.escape_identifier(self.class.name)}").clear
yield pg_conn
ensure
ar_conn&.disconnect!
end
end
end