Skip to content

Commit

Permalink
Merge pull request #424 from akira/reenqueue-from-backup
Browse files Browse the repository at this point in the history
re-enqueue unfinished jobs to the begining of queue on restart
  • Loading branch information
ananthakumaran committed Aug 15, 2020
2 parents 3f4b5c5 + 002fefc commit 548bd9e
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 45 deletions.
6 changes: 1 addition & 5 deletions lib/exq/redis/connection.ex
Expand Up @@ -103,10 +103,6 @@ defmodule Exq.Redis.Connection do
q(redis, ["LPOP", key])
end

def rpoplpush(redis, key, backup) do
q(redis, ["RPOPLPUSH", key, backup])
end

def zadd(redis, set, score, member) do
q(redis, ["ZADD", set, score, member])
end
Expand Down Expand Up @@ -184,7 +180,7 @@ defmodule Exq.Redis.Connection do
defp handle_response({:error, %{message: "NOSCRIPT" <> _rest}} = error, _) do
error
end

defp handle_response({:error, %Redix.ConnectionError{reason: :disconnected}} = error, _) do
error
end
Expand Down
10 changes: 6 additions & 4 deletions lib/exq/redis/heartbeat.ex
Expand Up @@ -46,16 +46,18 @@ defmodule Exq.Redis.Heartbeat do
JobQueue.queue_key(namespace, queue),
sorted_set_key(namespace)
],
[node_id, current_score]
[node_id, current_score, 10]
)

case resp do
{:ok, job} ->
if String.valid?(job) do
{:ok, [remaining, moved]} ->
if moved > 0 do
Logger.info(
"Re-enqueueing job from backup for node_id [#{node_id}] and queue [#{queue}]"
"Re-enqueued #{moved} job(s) from backup for node_id [#{node_id}] and queue [#{queue}]"
)
end

if remaining > 0 do
re_enqueue_backup(redis, namespace, node_id, queue, current_score)
end

Expand Down
18 changes: 11 additions & 7 deletions lib/exq/redis/job_queue.ex
Expand Up @@ -14,6 +14,7 @@ defmodule Exq.Redis.JobQueue do
require Logger

alias Exq.Redis.Connection
alias Exq.Redis.Script
alias Exq.Support.Job
alias Exq.Support.Config
alias Exq.Support.Time
Expand Down Expand Up @@ -123,19 +124,22 @@ defmodule Exq.Redis.JobQueue do

def re_enqueue_backup(redis, namespace, node_id, queue) do
resp =
redis
|> Connection.rpoplpush(
backup_queue_key(namespace, node_id, queue),
queue_key(namespace, queue)
Script.eval!(
redis,
:mlpop_rpush,
[backup_queue_key(namespace, node_id, queue), queue_key(namespace, queue)],
[10]
)

case resp do
{:ok, job} ->
if String.valid?(job) do
{:ok, [remaining, moved]} ->
if moved > 0 do
Logger.info(
"Re-enqueueing job from backup for node_id [#{node_id}] and queue [#{queue}]"
"Re-enqueued #{moved} job(s) from backup for node_id [#{node_id}] and queue [#{queue}]"
)
end

if remaining > 0 do
re_enqueue_backup(redis, namespace, node_id, queue)
end

Expand Down
32 changes: 29 additions & 3 deletions lib/exq/redis/script.ex
Expand Up @@ -12,6 +12,22 @@ defmodule Exq.Redis.Script do
end

@scripts %{
mlpop_rpush:
Prepare.script("""
local from, to = KEYS[1], KEYS[2]
local limit = tonumber(ARGV[1])
local length = redis.call('LLEN', from)
local value = nil
local moved = 0
while limit > 0 and length > 0 do
value = redis.call('LPOP', from)
redis.call('RPUSH', to, value)
limit = limit - 1
length = length - 1
moved = moved + 1
end
return {length, moved}
"""),
heartbeat_re_enqueue_backup:
Prepare.script("""
local function contains(table, element)
Expand All @@ -24,12 +40,22 @@ defmodule Exq.Redis.Script do
end
local backup_queue_key, queue_key, heartbeat_key = KEYS[1], KEYS[2], KEYS[3]
local node_id, expected_score = ARGV[1], ARGV[2]
local node_id, expected_score, limit = ARGV[1], ARGV[2], tonumber(ARGV[3])
local node_ids = redis.call('ZRANGEBYSCORE', heartbeat_key, expected_score, expected_score)
if contains(node_ids, node_id) then
return redis.call('RPOPLPUSH', backup_queue_key, queue_key)
local length = redis.call('LLEN', backup_queue_key)
local value = nil
local moved = 0
while limit > 0 and length > 0 do
value = redis.call('LPOP', backup_queue_key)
redis.call('RPUSH', queue_key, value)
limit = limit - 1
length = length - 1
moved = moved + 1
end
return {length, moved}
else
return nil
return {0, 0}
end
""")
}
Expand Down
30 changes: 30 additions & 0 deletions test/exq/heartbeat/monitor_test.exs
Expand Up @@ -51,6 +51,36 @@ defmodule Exq.Heartbeat.MonitorTest do
assert queue_length(redis, "3") == {:ok, 0}
end

test "re-enqueues more than 10 orphaned jobs from dead node's backup queue" do
{:ok, _} = Exq.Stats.Server.start_link(@opts)
redis = :testredis

servers =
for i <- 1..5 do
{:ok, heartbeat} =
Exq.Heartbeat.Server.start_link(Keyword.put(@opts, :node_id, to_string(i)))

{:ok, monitor} =
Exq.Heartbeat.Monitor.start_link(Keyword.put(@opts, :node_id, to_string(i)))

%{heartbeat: heartbeat, monitor: monitor}
end

for i <- 1..15 do
assert {:ok, ^i} = working(redis, "3")
end

Process.sleep(1000)
assert alive_nodes(redis) == ["1", "2", "3", "4", "5"]
assert queue_length(redis, "3") == {:ok, 15}
server = Enum.at(servers, 2)
:ok = GenServer.stop(server.heartbeat)
Process.sleep(2000)

assert alive_nodes(redis) == ["1", "2", "4", "5"]
assert queue_length(redis, "3") == {:ok, 0}
end

test "can handle connection failure" do
with_application_env(:exq, :redis_timeout, 500, fn ->
{:ok, _} = Exq.Stats.Server.start_link(@opts)
Expand Down
50 changes: 40 additions & 10 deletions test/job_queue_test.exs
Expand Up @@ -20,10 +20,17 @@ defmodule JobQueueTest do
jobs = JobQueue.dequeue(:testredis, "test", @host, queues)
result = jobs |> Enum.reject(fn {:ok, {status, _}} -> status == :none end)

if is_boolean(expected_result) do
assert expected_result == !Enum.empty?(result)
else
assert expected_result == Enum.count(result)
cond do
is_boolean(expected_result) ->
assert expected_result == !Enum.empty?(result)

is_integer(expected_result) ->
assert expected_result == Enum.count(result)

is_map(expected_result) ->
[{:ok, {job_string, _queue}}] = result
job = Jason.decode!(job_string)
assert expected_result == Map.take(job, Map.keys(expected_result))
end
end

Expand All @@ -43,14 +50,37 @@ defmodule JobQueueTest do
end

test "backup queue" do
JobQueue.enqueue(:testredis, "test", "default", MyWorker, [], [])
JobQueue.enqueue(:testredis, "test", "default", MyWorker, [], [])
assert_dequeue_job(["default"], true)
assert_dequeue_job(["default"], true)
JobQueue.enqueue(:testredis, "test", "default", MyWorker, [1], [])
JobQueue.enqueue(:testredis, "test", "default", MyWorker, [2], [])
assert_dequeue_job(["default"], %{"args" => [1]})
assert_dequeue_job(["default"], %{"args" => [2]})
assert_dequeue_job(["default"], false)
JobQueue.enqueue(:testredis, "test", "default", MyWorker, [3], [])
JobQueue.enqueue(:testredis, "test", "default", MyWorker, [4], [])
JobQueue.re_enqueue_backup(:testredis, "test", @host, "default")
assert_dequeue_job(["default"], true)
assert_dequeue_job(["default"], true)
assert_dequeue_job(["default"], %{"args" => [1]})
assert_dequeue_job(["default"], %{"args" => [2]})
assert_dequeue_job(["default"], %{"args" => [3]})
assert_dequeue_job(["default"], %{"args" => [4]})
assert_dequeue_job(["default"], false)
end

test "backup queue re enqueues all jobs" do
for i <- 1..15 do
JobQueue.enqueue(:testredis, "test", "default", MyWorker, [i], [])
assert_dequeue_job(["default"], %{"args" => [i]})
end

for i <- 16..30 do
JobQueue.enqueue(:testredis, "test", "default", MyWorker, [i], [])
end

JobQueue.re_enqueue_backup(:testredis, "test", @host, "default")

for i <- 1..30 do
assert_dequeue_job(["default"], %{"args" => [i]})
end

assert_dequeue_job(["default"], false)
end

Expand Down
58 changes: 42 additions & 16 deletions test/worker_test.exs
Expand Up @@ -160,27 +160,53 @@ defmodule WorkerTest do
job = "{ \"queue\": \"default\", \"class\": \"#{class}\", \"args\": #{args} }"

work_table = :ets.new(:work_table, [:set, :public])
{:ok, stub_server} = WorkerTest.MockServer.start_link()
{:ok, mock_stats_server} = GenServer.start_link(WorkerTest.MockStatsServer, %{})
{:ok, middleware} = GenServer.start_link(Exq.Middleware.Server, [])
{:ok, metadata} = Exq.Worker.Metadata.start_link(%{})

{:ok, stub_server} =
start_supervised(%{
id: WorkerTest.MockServer,
start: {WorkerTest.MockServer, :start_link, []}
})

{:ok, mock_stats_server} =
start_supervised(%{
id: WorkerTest.MockStatsServer,
start: {GenServer, :start_link, [WorkerTest.MockStatsServer, %{}]}
})

{:ok, middleware} =
start_supervised(%{
id: Exq.Middleware.Server,
start: {GenServer, :start_link, [Exq.Middleware.Server, []]}
})

{:ok, metadata} =
start_supervised(%{
id: Exq.Worker.Metadata,
start: {Exq.Worker.Metadata, :start_link, [%{}]}
})

Exq.Middleware.Server.push(middleware, Exq.Middleware.Stats)
Exq.Middleware.Server.push(middleware, Exq.Middleware.Job)
Exq.Middleware.Server.push(middleware, Exq.Middleware.Manager)
Exq.Middleware.Server.push(middleware, Exq.Middleware.Logger)

Exq.Worker.Server.start_link(
job,
stub_server,
"default",
work_table,
mock_stats_server,
"exq",
"localhost",
stub_server,
middleware,
metadata
)
start_supervised(%{
id: Exq.Worker.Server,
start:
{Exq.Worker.Server, :start_link,
[
job,
stub_server,
"default",
work_table,
mock_stats_server,
"exq",
"localhost",
stub_server,
middleware,
metadata
]}
})
end

test "execute valid job with perform" do
Expand Down

0 comments on commit 548bd9e

Please sign in to comment.