/
heartbeat.ex
85 lines (72 loc) · 2.24 KB
/
heartbeat.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
defmodule Exq.Redis.Heartbeat do
require Logger
alias Exq.Redis.Connection
alias Exq.Redis.JobQueue
alias Exq.Redis.Script
def register(redis, namespace, node_id) do
score = DateTime.to_unix(DateTime.utc_now(), :millisecond) / 1000
case Connection.qp(redis, [
["MULTI"],
["ZREM", sorted_set_key(namespace), node_id],
["ZADD", sorted_set_key(namespace), score, node_id],
["EXEC"]
]) do
{:ok, ["OK", "QUEUED", "QUEUED", [_, 1]]} ->
:ok
error ->
Logger.error("Failed to send heartbeat. Unexpected error from redis: #{inspect(error)}")
error
end
end
def unregister(redis, namespace, node_id) do
case Connection.zrem(redis, sorted_set_key(namespace), node_id) do
{:ok, _} ->
:ok
error ->
Logger.error(
"Failed to clear old heartbeat. Unexpected error from redis: #{inspect(error)}"
)
error
end
end
def re_enqueue_backup(redis, namespace, node_id, queue, current_score) do
resp =
Script.eval!(
redis,
:heartbeat_re_enqueue_backup,
[
JobQueue.backup_queue_key(namespace, node_id, queue),
JobQueue.queue_key(namespace, queue),
sorted_set_key(namespace)
],
[node_id, current_score, 10]
)
case resp do
{:ok, [remaining, moved]} ->
if moved > 0 do
Logger.info(
"Re-enqueued #{moved} job(s) from backup for node_id [#{node_id}] and queue [#{queue}]"
)
end
if remaining > 0 do
re_enqueue_backup(redis, namespace, node_id, queue, current_score)
end
_ ->
nil
end
end
def dead_nodes(redis, namespace, interval, missed_heartbeats_allowed) do
score = DateTime.to_unix(DateTime.utc_now(), :millisecond) / 1000
cutoff = score - interval / 1000 * (missed_heartbeats_allowed + 1)
cutoff = Enum.max([0, cutoff])
with {:ok, results} <-
Connection.zrangebyscorewithscore(redis, sorted_set_key(namespace), 0, cutoff) do
{:ok,
Enum.chunk_every(results, 2)
|> Map.new(fn [k, v] -> {k, v} end)}
end
end
defp sorted_set_key(namespace) do
"#{namespace}:heartbeats"
end
end