/
server.ex
406 lines (326 loc) · 13.3 KB
/
server.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
defmodule Exq.Manager.Server do
@moduledoc """
The Manager module is the main orchestrator for the system
It is also the entry point Pid process used by the client to interact
with the Exq system.
It's responsibilities include:
* Handle interaction with client and delegate to responsible sub-system
* Initial Setup of Redis Connection (to be moved to supervisor?).
* Setup and tracking of in-progress workers / jobs.
* Poll Redis for new jobs for any queues that have available workers.
* Handling of queue state and subscriptions (addition and removal)
* Initial re-hydration of backup queue on system restart to handle any
orphan jobs from last system stop.
The Manager is a GenServer with a timed process loop.
## Options
* `:concurrency` - Default max number of workers to use if not passed in for each queue.
* `:genserver_timeout` - Timeout to use for GenServer calls.
* `:max_retries` - Maximum number of times to retry a failed job
* `:name` - Name of target registered process
* `:namespace` - Redis namespace to store all data under. Defaults to "exq".
* `:queues` - List of queues to monitor. Can be an array of queues names such as ["q1", "q2"], or
array of tuples with queue and max number of concurrent workers: [{"q1", 1}, {"q2", 20}].
If only an array is passed in, system will use the default `concurrency` value for each queue.
* `:redis_timeout` - Timeout to use for Redis commands.
* `:poll_timeout` - How often to poll Redis for jobs.
* `:scheduler_enable` - Whether scheduler / retry process should be enabled. This defaults
to true. Note that is you turn this off, job retries will not be enqueued.
* `:scheduler_poll_timeout` - How often to poll Redis for scheduled / retry jobs.
## Redis Options (TODO - move to supervisor after refactor):
* `:host` - Host name for Redis server (defaults to '127.0.0.1')
* `:port` - Redis port (defaults to 6379)
* `:database` - Redis Database number (used for isolation. Defaults to 0).
* `:password` - Redis authentication password (optional, off by default).
* `:redis_options` - Additional options provided to Redix
* TODO: What about max_reconnection_attempts
## Job lifecycle
The job lifecycle starts with an enqueue of a job. This can be done either
via Exq or another system like Sidekiq / Resque.
Note that the JobQueue encapsulates much of this logic.
Client (Exq) -> Manager -> Enqueuer
Assuming Exq is used to Enqueue an immediate job, the following is the flow:
1. Client calls Exq.enqueue(Exq, "queue_name", Worker, ["arg1", "arg2"])
2. Manager delegates to Enqueuer
3. Enqueuer does the following:
* Adds the queue to the "queues" list if not already there.
* Prepare a job struct with a generated UUID and convert to JSON.
* Push the job into the correct queue
* Respond to client with the generated job UUID.
At this point the job is in the correct queue ready to be dequeued.
Manager deq Redis -> Worker (decode & execute job) --> Manager (record)
|
--> Stats (record stats)
The dequeueing of the job is as follows:
1. The Manager is on a polling cycle, and the :timeout message fires.
2. Manager tabulates a list of active queues with available workers.
3. Uses the JobQueue module to fetch jobs. The JobQueue module does this through
a single MULT RPOPLPUSH command issued to Redis with the targeted queue.
This command atomicaly pops an item off the queue and stores the item in a backup queue.
The backup queue is keyed off the queue and node id, so each node would
have their own backup queue.
Note that we cannot use a blocking pop since BRPOPLPUSH (unlike BRPOP) is more
limited and can only handle a single queue target (see filed issues in Redis / Sidekiq).
4. Once the jobs are returned to the manager, the manager goes through each job
and creates and kicks off an ephemeral Worker process that will handle the job.
The manager also does some tabulation to reduce the worker count for those queues.
5. The worker parses the JSON object, and figures out the worker to call.
It also tells Stats to record a itself in process.
6. The worker then calls "apply" on the correct target module, and tracks the failure
or success of the job. Once the job is finished, it tells the Manager and Stats.
7. If the job is successful, Manager and Stats simply mark the success of the job.
If the job fails, the Worker module uses the JobQueue module to retry the job if necessary.
The retry is done by adding the job to a "retry" queue which is a Sorted Set in Redis.
The job is marked with the retry count and scheduled date (using exponential backup).
The job is then removed from the backup queue.
8. If any jobs were fetched from Redis, the Manager will poll again immediately, otherwise
if will use the poll_timeout for the next polling.
## Retry / Schedule queue
The retry / schedule queue provides functionality for scheduled jobs. This is used both
for the `enqueue_in` method which allows a scheduled job in the future, as well
as retry queue, which is used to retry jobs.
"""
require Logger
use GenServer
alias Exq.Support.Config
alias Exq.Redis.JobQueue
@backoff_mult 10
defmodule State do
defstruct redis: nil,
stats: nil,
enqueuer: nil,
pid: nil,
node_id: nil,
namespace: nil,
work_table: nil,
queues: nil,
poll_timeout: nil,
scheduler_poll_timeout: nil,
workers_sup: nil,
middleware: nil,
metadata: nil
end
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: server_name(opts[:name]))
end
def job_terminated(exq, namespace, queue, job_serialized) do
GenServer.cast(exq, {:job_terminated, namespace, queue, job_serialized})
:ok
end
def server_name(nil), do: Config.get(:name)
def server_name(name), do: name
## ===========================================================
## gen server callbacks
## ===========================================================
def init(opts) do
# Cleanup stale stats
GenServer.cast(self(), :cleanup_host_stats)
# Setup queues
work_table = setup_queues(opts)
state = %State{
work_table: work_table,
redis: opts[:redis],
stats: opts[:stats],
workers_sup: opts[:workers_sup],
enqueuer: opts[:enqueuer],
middleware: opts[:middleware],
metadata: opts[:metadata],
node_id: Config.node_identifier().node_id(),
namespace: opts[:namespace],
queues: opts[:queues],
pid: self(),
poll_timeout: opts[:poll_timeout],
scheduler_poll_timeout: opts[:scheduler_poll_timeout]
}
check_redis_connection(opts)
{:ok, state, 0}
end
def handle_call(:redis, _from, state) do
{:reply, {state.redis, state.namespace}, state, 10}
end
def handle_call(:subscriptions, _from, state) do
{:reply, {:ok, state.queues}, state, 0}
end
def handle_call({:subscribe, queue}, _from, state) do
updated_state = add_queue(state, queue)
{:reply, :ok, updated_state, 0}
end
def handle_call({:subscribe, queue, concurrency}, _from, state) do
updated_state = add_queue(state, queue, concurrency)
{:reply, :ok, updated_state, 0}
end
def handle_call({:unsubscribe, queue}, _from, state) do
updated_state = remove_queue(state, queue)
{:reply, :ok, updated_state, 0}
end
def handle_call(:unsubscribe_all, _from, state) do
updated_state = remove_all_queues(state)
{:reply, :ok, updated_state, 0}
end
def handle_cast({:re_enqueue_backup, queue}, state) do
rescue_timeout(fn ->
JobQueue.re_enqueue_backup(state.redis, state.namespace, state.node_id, queue)
end)
{:noreply, state, 0}
end
@doc """
Cleanup host stats on boot
"""
def handle_cast(:cleanup_host_stats, state) do
rescue_timeout(fn ->
Exq.Stats.Server.cleanup_host_stats(state.stats, state.namespace, state.node_id)
end)
{:noreply, state, 0}
end
def handle_cast({:job_terminated, _namespace, queue, _job_serialized}, state) do
update_worker_count(state.work_table, queue, -1)
{:noreply, state, 0}
end
def handle_info(:timeout, state) do
{updated_state, timeout} = dequeue_and_dispatch(state)
{:noreply, updated_state, timeout}
end
def handle_info(_info, state) do
{:noreply, state, state.poll_timeout}
end
def terminate(_reason, _state) do
:ok
end
## ===========================================================
## Internal Functions
## ===========================================================
@doc """
Dequeue jobs and dispatch to workers
"""
def dequeue_and_dispatch(state), do: dequeue_and_dispatch(state, available_queues(state))
def dequeue_and_dispatch(state, []), do: {state, state.poll_timeout}
def dequeue_and_dispatch(state, queues) do
rescue_timeout({state, state.poll_timeout}, fn ->
jobs = Exq.Redis.JobQueue.dequeue(state.redis, state.namespace, state.node_id, queues)
job_results = jobs |> Enum.map(fn potential_job -> dispatch_job(state, potential_job) end)
cond do
Enum.any?(job_results, fn status -> elem(status, 1) == :dispatch end) ->
{state, 0}
Enum.any?(job_results, fn status -> elem(status, 0) == :error end) ->
Logger.error("Redis Error #{Kernel.inspect(job_results)}}. Backing off...")
{state, state.poll_timeout * @backoff_mult}
true ->
{state, state.poll_timeout}
end
end)
end
@doc """
Returns list of active queues with free workers
"""
def available_queues(state) do
Enum.filter(state.queues, fn q ->
[{_, concurrency, worker_count}] = :ets.lookup(state.work_table, q)
worker_count < concurrency
end)
end
@doc """
Dispatch job to worker if it is not empty
Also update worker count for dispatched job
"""
def dispatch_job(state, potential_job) do
case potential_job do
{:ok, {:none, _queue}} ->
{:ok, :none}
{:ok, {job, queue}} ->
dispatch_job(state, job, queue)
{:ok, :dispatch}
{status, reason} ->
{:error, {status, reason}}
end
end
def dispatch_job(state, job, queue) do
{:ok, worker} =
Exq.Worker.Supervisor.start_child(
state.workers_sup,
[
job,
state.pid,
queue,
state.work_table,
state.stats,
state.namespace,
state.node_id,
state.redis,
state.middleware,
state.metadata
]
)
Exq.Worker.Server.work(worker)
update_worker_count(state.work_table, queue, 1)
end
# Setup queues from options / configs.
# The following is done:
# * Sets up queues data structure with proper concurrency settings
# * Sets up :ets table for tracking workers
# * Re-enqueues any in progress jobs that were not finished the queues
# * Returns list of queues and work table
# TODO: Refactor the way queues are setup
defp setup_queues(opts) do
work_table = :ets.new(:work_table, [:set, :public])
Enum.each(opts[:concurrency], fn queue_concurrency ->
:ets.insert(work_table, queue_concurrency)
GenServer.cast(self(), {:re_enqueue_backup, elem(queue_concurrency, 0)})
end)
work_table
end
defp add_queue(state, queue, concurrency \\ Config.get(:concurrency)) do
queue_concurrency = {queue, concurrency, 0}
:ets.insert(state.work_table, queue_concurrency)
GenServer.cast(self(), {:re_enqueue_backup, queue})
updated_queues = [queue | state.queues]
%{state | queues: updated_queues}
end
defp remove_queue(state, queue) do
:ets.delete(state.work_table, queue)
updated_queues = List.delete(state.queues, queue)
%{state | queues: updated_queues}
end
defp remove_all_queues(state) do
true = :ets.delete_all_objects(state.work_table)
%{state | queues: []}
end
defp update_worker_count(work_table, queue, delta) do
:ets.update_counter(work_table, queue, {3, delta})
rescue
# The queue has been unsubscribed
_error in ArgumentError -> :ok
end
@doc """
Rescue GenServer timeout.
"""
def rescue_timeout(f) do
rescue_timeout(nil, f)
end
def rescue_timeout(fail_return, f) do
try do
f.()
catch
:exit, {:timeout, info} ->
Logger.info("Manager timeout occurred #{Kernel.inspect(info)}")
fail_return
end
end
# Check Redis connection using PING and raise exception with
# user friendly error message if Redis is down.
defp check_redis_connection(opts) do
try do
{:ok, _} = Exq.Redis.Connection.q(opts[:redis], ~w(PING))
catch
err, reason ->
opts = Exq.Support.Opts.redis_opts(opts) |> List.wrap() |> List.delete(:password)
raise """
\n\n\n#{String.duplicate("=", 100)}
ERROR! Could not connect to Redis!
Configuration passed in: #{inspect(opts)}
Error: #{inspect(err)}
Reason: #{inspect(reason)}
Make sure Redis is running, and your configuration matches Redis settings.
#{String.duplicate("=", 100)}
"""
end
end
end