Skip to content

Commit

Permalink
api to retry job in retry queue immediatly
Browse files Browse the repository at this point in the history
  • Loading branch information
ananthakumaran committed Dec 4, 2021
1 parent 23e3de6 commit 6a3c01f
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 0 deletions.
15 changes: 15 additions & 0 deletions lib/exq/api.ex
Expand Up @@ -375,6 +375,21 @@ defmodule Exq.Api do
GenServer.call(pid, :clear_retries)
end

@doc """
Re enqueue jobs from retry queue immediatly.
Expected args:
* `pid` - Exq.Api process
* `raw_job` - raw json encoded job value
Returns:
* `{:ok, enqueued}`
"""
def dequeue_retry_jobs(pid, raw_jobs) do
GenServer.call(pid, {:dequeue_retry_jobs, raw_jobs})
end

@doc """
Number of jobs in the retry queue.
Expand Down
5 changes: 5 additions & 0 deletions lib/exq/api/server.ex
Expand Up @@ -170,6 +170,11 @@ defmodule Exq.Api.Server do
{:reply, :ok, state}
end

def handle_call({:dequeue_retry_jobs, raw_jobs}, _from, state) do
result = JobQueue.dequeue_retry_jobs(state.redis, state.namespace, raw_jobs)
{:reply, result, state}
end

def handle_call({:remove_scheduled, jid}, _from, state) do
JobQueue.remove_scheduled(state.redis, state.namespace, jid)
{:reply, :ok, state}
Expand Down
8 changes: 8 additions & 0 deletions lib/exq/redis/job_queue.ex
Expand Up @@ -398,6 +398,10 @@ defmodule Exq.Redis.JobQueue do
Connection.zrem!(redis, retry_queue_key(namespace), raw_jobs)
end

def dequeue_retry_jobs(redis, namespace, raw_jobs) do
dequeue_scheduled_jobs(redis, namespace, retry_queue_key(namespace), raw_jobs)
end

def remove_scheduled(redis, namespace, jid) do
{:ok, job} = find_job(redis, namespace, jid, :scheduled, false)
Connection.zrem!(redis, scheduled_queue_key(namespace), job)
Expand Down Expand Up @@ -492,6 +496,10 @@ defmodule Exq.Redis.JobQueue do
{jid, Config.serializer().encode!(job)}
end

defp dequeue_scheduled_jobs(redis, namespace, queue_key, raw_jobs) do
Script.eval!(redis, :scheduler_dequeue_jobs, [queue_key, full_key(namespace, "")], raw_jobs)
end

defp get_max_retries do
:max_retries
|> Config.get()
Expand Down
16 changes: 16 additions & 0 deletions lib/exq/redis/script.ex
Expand Up @@ -12,6 +12,22 @@ defmodule Exq.Redis.Script do
end

@scripts %{
scheduler_dequeue_jobs:
Prepare.script("""
local schedule_queue, namespace_prefix = KEYS[1], KEYS[2]
local jobs = ARGV
local dequeued = 0
for _, job in ipairs(jobs) do
local job_queue = cjson.decode(job)['queue']
local count = redis.call('ZREM', schedule_queue, job)
if count == 1 then
redis.call('SADD', namespace_prefix .. 'queues', job_queue)
redis.call('LPUSH', namespace_prefix .. 'queue:' .. job_queue, job)
dequeued = dequeued + 1
end
end
return dequeued
"""),
scheduler_dequeue:
Prepare.script("""
local schedule_queue = KEYS[1]
Expand Down
18 changes: 18 additions & 0 deletions test/api_test.exs
Expand Up @@ -252,6 +252,24 @@ defmodule ApiTest do
assert {:ok, nil} = Exq.Api.find_scheduled(Exq.Api, jid)
end

test "re enqueue jobs in retry queue" do
jid = "1234"

JobQueue.retry_job(
:testredis,
'test',
%Job{jid: "1234", queue: "test"},
1,
"this is an error"
)

{:ok, [raw_job]} = Exq.Api.retries(Exq.Api, raw: true)
assert {:ok, 1} = Exq.Api.dequeue_retry_jobs(Exq.Api, [raw_job])
assert {:ok, nil} = Exq.Api.find_scheduled(Exq.Api, jid)

assert {:ok, 0} = Exq.Api.dequeue_retry_jobs(Exq.Api, [raw_job])
end

test "remove job in scheduled queue" do
{:ok, jid} = Exq.enqueue_in(Exq, 'custom', 1000, Bogus, [])
Exq.Api.remove_scheduled(Exq.Api, jid)
Expand Down

0 comments on commit 6a3c01f

Please sign in to comment.