diff --git a/lib/exq/api.ex b/lib/exq/api.ex index 5515088..676399c 100644 --- a/lib/exq/api.ex +++ b/lib/exq/api.ex @@ -90,13 +90,17 @@ defmodule Exq.Api do Expected args: * `pid` - Exq.Api process * `queue` - Queue name + * `options` + - size: (integer) size of list + - offset: (integer) start offset of the list + - raw: (boolean) whether to deserialize the job Returns: * `{:ok, [jobs]}` """ - def jobs(pid, queue) do - GenServer.call(pid, {:jobs, queue}) + def jobs(pid, queue, options \\ []) do + GenServer.call(pid, {:jobs, queue, options}) end @doc """ @@ -104,13 +108,17 @@ defmodule Exq.Api do Expected args: * `pid` - Exq.Api process + * `options` + - score: (boolean) whether to include job score + - size: (integer) size of list + - offset: (integer) start offset of the list Returns: * `{:ok, [jobs]}` """ - def retries(pid) do - GenServer.call(pid, :retries) + def retries(pid, options \\ []) do + GenServer.call(pid, {:retries, options}) end @doc """ @@ -118,13 +126,17 @@ defmodule Exq.Api do Expected args: * `pid` - Exq.Api process + * `options` + - score: (boolean) whether to include job score + - size: (integer) size of list + - offset: (integer) start offset of the list Returns: * `{:ok, [jobs]}` """ - def scheduled(pid) do - GenServer.call(pid, {:jobs, :scheduled}) + def scheduled(pid, options \\ []) do + GenServer.call(pid, {:jobs, :scheduled, options}) end @doc """ @@ -157,10 +169,27 @@ defmodule Exq.Api do * `:ok` """ + @deprecated "use remove_enqueued_jobs/3" def remove_job(pid, queue, jid) do GenServer.call(pid, {:remove_job, queue, jid}) end + @doc """ + Removes a job from the queue specified. + + Expected args: + * `pid` - Exq.Api process + * `queue` - The name of the queue to remove the job from + * `raw_job` - raw json encoded job value + + Returns: + * `:ok` + + """ + def remove_enqueued_jobs(pid, queue, raw_jobs) do + GenServer.call(pid, {:remove_enqueued_jobs, queue, raw_jobs}) + end + @doc """ A count of the number of jobs in the queue, for each queue. @@ -195,19 +224,42 @@ defmodule Exq.Api do Expected args: * `pid` - Exq.Api process + * `options` + - score: (boolean) whether to include job score + - size: (integer) size of list + - offset: (integer) start offset of the list Returns: * `{:ok, [jobs]}` """ - def failed(pid) do - GenServer.call(pid, :failed) + def failed(pid, options \\ []) do + GenServer.call(pid, {:failed, options}) end + @deprecated "use find_failed/4" def find_failed(pid, jid) do GenServer.call(pid, {:find_failed, jid}) end + @doc """ + Find failed job + + Expected args: + * `pid` - Exq.Api process + * `score` - Job score + * `jid` - Job jid + * `options` + - raw: (boolean) whether to deserialize the job + + Returns: + * `{:ok, job}` + + """ + def find_failed(pid, score, jid, options \\ []) do + GenServer.call(pid, {:find_failed, score, jid, options}) + end + @doc """ Removes a job in the queue of jobs that have failed and exceeded their retry count. @@ -219,10 +271,26 @@ defmodule Exq.Api do * `:ok` """ + @deprecated "use remove_failed_jobs/2" def remove_failed(pid, jid) do GenServer.call(pid, {:remove_failed, jid}) end + @doc """ + Removes jobs from dead queue. + + Expected args: + * `pid` - Exq.Api process + * `raw_job` - raw json encoded job value + + Returns: + * `:ok` + + """ + def remove_failed_jobs(pid, raw_jobs) do + GenServer.call(pid, {:remove_failed_jobs, raw_jobs}) + end + def clear_failed(pid) do GenServer.call(pid, :clear_failed) end @@ -241,10 +309,29 @@ defmodule Exq.Api do GenServer.call(pid, :failed_size) end + @deprecated "use find_retry/4" def find_retry(pid, jid) do GenServer.call(pid, {:find_retry, jid}) end + @doc """ + Find job in retry queue + + Expected args: + * `pid` - Exq.Api process + * `score` - Job score + * `jid` - Job jid + * `options` + - raw: (boolean) whether to deserialize the job + + Returns: + * `{:ok, job}` + + """ + def find_retry(pid, score, jid, options \\ []) do + GenServer.call(pid, {:find_retry, score, jid, options}) + end + @doc """ Removes a job in the retry queue from being enqueued again. @@ -256,10 +343,26 @@ defmodule Exq.Api do * `:ok` """ + @deprecated "use remove_retry_jobs/2" def remove_retry(pid, jid) do GenServer.call(pid, {:remove_retry, jid}) end + @doc """ + Removes jobs from retry queue. + + Expected args: + * `pid` - Exq.Api process + * `raw_job` - raw json encoded job value + + Returns: + * `:ok` + + """ + def remove_retry_jobs(pid, raw_jobs) do + GenServer.call(pid, {:remove_retry_jobs, raw_jobs}) + end + def clear_retries(pid) do GenServer.call(pid, :clear_retries) end @@ -278,10 +381,29 @@ defmodule Exq.Api do GenServer.call(pid, :retry_size) end + @deprecated "use find_scheduled/4" def find_scheduled(pid, jid) do GenServer.call(pid, {:find_scheduled, jid}) end + @doc """ + Find job in scheduled queue + + Expected args: + * `pid` - Exq.Api process + * `score` - Job score + * `jid` - Job jid + * `options` + - raw: (boolean) whether to deserialize the job + + Returns: + * `{:ok, job}` + + """ + def find_scheduled(pid, score, jid, options \\ []) do + GenServer.call(pid, {:find_scheduled, score, jid, options}) + end + @doc """ Removes a job scheduled to run in the future from being enqueued. @@ -293,10 +415,26 @@ defmodule Exq.Api do * `:ok` """ + @deprecated "use remove_scheduled_jobs/2" def remove_scheduled(pid, jid) do GenServer.call(pid, {:remove_scheduled, jid}) end + @doc """ + Removes jobs from scheduled queue. + + Expected args: + * `pid` - Exq.Api process + * `raw_job` - raw json encoded job value + + Returns: + * `:ok` + + """ + def remove_scheduled_jobs(pid, raw_jobs) do + GenServer.call(pid, {:remove_scheduled_jobs, raw_jobs}) + end + def clear_scheduled(pid) do GenServer.call(pid, :clear_scheduled) end @@ -333,8 +471,14 @@ defmodule Exq.Api do GenServer.call(pid, {:stats, key}) end + def stats(pid, key, dates) when is_list(dates) do + GenServer.call(pid, {:stats, key, dates}) + end + def stats(pid, key, date) do - GenServer.call(pid, {:stats, key, date}) + with {:ok, [count]} <- GenServer.call(pid, {:stats, key, [date]}) do + {:ok, count} + end end def realtime_stats(pid) do diff --git a/lib/exq/api/server.ex b/lib/exq/api/server.ex index f849356..2066adc 100644 --- a/lib/exq/api/server.ex +++ b/lib/exq/api/server.ex @@ -40,9 +40,9 @@ defmodule Exq.Api.Server do {:reply, {:ok, count}, state} end - def handle_call({:stats, key, date}, _from, state) do - count = JobStat.get_count(state.redis, state.namespace, "#{key}:#{date}") - {:reply, {:ok, count}, state} + def handle_call({:stats, key, dates}, _from, state) do + counts = JobStat.get_counts(state.redis, state.namespace, Enum.map(dates, &"#{key}:#{&1}")) + {:reply, {:ok, counts}, state} end def handle_call(:queues, _from, state) do @@ -50,13 +50,13 @@ defmodule Exq.Api.Server do {:reply, {:ok, queues}, state} end - def handle_call(:failed, _from, state) do - jobs = JobQueue.failed(state.redis, state.namespace) + def handle_call({:failed, options}, _from, state) do + jobs = JobQueue.failed(state.redis, state.namespace, options) {:reply, {:ok, jobs}, state} end - def handle_call(:retries, _from, state) do - jobs = JobQueue.scheduled_jobs(state.redis, state.namespace, "retry") + def handle_call({:retries, options}, _from, state) do + jobs = JobQueue.scheduled_jobs(state.redis, state.namespace, "retry", options) {:reply, {:ok, jobs}, state} end @@ -65,8 +65,8 @@ defmodule Exq.Api.Server do {:reply, {:ok, jobs}, state} end - def handle_call({:jobs, :scheduled}, _from, state) do - jobs = JobQueue.scheduled_jobs(state.redis, state.namespace, "schedule") + def handle_call({:jobs, :scheduled, options}, _from, state) do + jobs = JobQueue.scheduled_jobs(state.redis, state.namespace, "schedule", options) {:reply, {:ok, jobs}, state} end @@ -75,8 +75,8 @@ defmodule Exq.Api.Server do {:reply, {:ok, jobs}, state} end - def handle_call({:jobs, queue}, _from, state) do - jobs = JobQueue.jobs(state.redis, state.namespace, queue) + def handle_call({:jobs, queue, options}, _from, state) do + jobs = JobQueue.jobs(state.redis, state.namespace, queue, options) {:reply, {:ok, jobs}, state} end @@ -110,6 +110,11 @@ defmodule Exq.Api.Server do {:reply, {:ok, job}, state} end + def handle_call({:find_failed, score, jid, options}, _from, state) do + {:ok, job} = JobStat.find_failed(state.redis, state.namespace, score, jid, options) + {:reply, {:ok, job}, state} + end + def handle_call({:find_job, queue, jid}, _from, state) do response = JobQueue.find_job(state.redis, state.namespace, jid, queue) {:reply, response, state} @@ -120,11 +125,21 @@ defmodule Exq.Api.Server do {:reply, {:ok, job}, state} end + def handle_call({:find_scheduled, score, jid, options}, _from, state) do + {:ok, job} = JobStat.find_scheduled(state.redis, state.namespace, score, jid, options) + {:reply, {:ok, job}, state} + end + def handle_call({:find_retry, jid}, _from, state) do {:ok, job} = JobQueue.find_job(state.redis, state.namespace, jid, :retry) {:reply, {:ok, job}, state} end + def handle_call({:find_retry, score, jid, options}, _from, state) do + {:ok, job} = JobStat.find_retry(state.redis, state.namespace, score, jid, options) + {:reply, {:ok, job}, state} + end + def handle_call({:remove_queue, queue}, _from, state) do JobStat.remove_queue(state.redis, state.namespace, queue) {:reply, :ok, state} @@ -135,21 +150,41 @@ defmodule Exq.Api.Server do {:reply, :ok, state} end + def handle_call({:remove_enqueued_jobs, queue, raw_jobs}, _from, state) do + JobQueue.remove_enqueued_jobs(state.redis, state.namespace, queue, raw_jobs) + {:reply, :ok, state} + end + def handle_call({:remove_retry, jid}, _from, state) do JobQueue.remove_retry(state.redis, state.namespace, jid) {:reply, :ok, state} end + def handle_call({:remove_retry_jobs, raw_jobs}, _from, state) do + JobQueue.remove_retry_jobs(state.redis, state.namespace, raw_jobs) + {:reply, :ok, state} + end + def handle_call({:remove_scheduled, jid}, _from, state) do JobQueue.remove_scheduled(state.redis, state.namespace, jid) {:reply, :ok, state} end + def handle_call({:remove_scheduled_jobs, raw_jobs}, _from, state) do + JobQueue.remove_scheduled_jobs(state.redis, state.namespace, raw_jobs) + {:reply, :ok, state} + end + def handle_call({:remove_failed, jid}, _from, state) do JobStat.remove_failed(state.redis, state.namespace, jid) {:reply, :ok, state} end + def handle_call({:remove_failed_jobs, raw_jobs}, _from, state) do + JobQueue.remove_failed_jobs(state.redis, state.namespace, raw_jobs) + {:reply, :ok, state} + end + def handle_call(:clear_failed, _from, state) do JobStat.clear_failed(state.redis, state.namespace) {:reply, :ok, state} diff --git a/lib/exq/redis/connection.ex b/lib/exq/redis/connection.ex index 5abe66b..729be07 100644 --- a/lib/exq/redis/connection.ex +++ b/lib/exq/redis/connection.ex @@ -86,7 +86,14 @@ defmodule Exq.Redis.Connection do end def lrem!(redis, list, value, count \\ 1, options \\ []) do - {:ok, res} = q(redis, ["LREM", list, count, value], options) + {:ok, res} = + if is_list(value) do + commands = Enum.map(value, fn v -> ["LREM", list, count, v] end) + qp(redis, commands, options) + else + q(redis, ["LREM", list, count, value], options) + end + res end @@ -128,6 +135,11 @@ defmodule Exq.Redis.Connection do items end + def zrangebyscorewithlimit!(redis, set, offset, size, min \\ "0", max \\ "+inf") do + {:ok, items} = q(redis, ["ZRANGEBYSCORE", set, min, max, "LIMIT", offset, size]) + items + end + def zrangebyscore(redis, set, min \\ "0", max \\ "+inf") do q(redis, ["ZRANGEBYSCORE", set, min, max]) end @@ -137,15 +149,37 @@ defmodule Exq.Redis.Connection do items end + def zrangebyscorewithscoreandlimit!(redis, set, offset, size, min \\ "0", max \\ "+inf") do + {:ok, items} = q(redis, ["ZRANGEBYSCORE", set, min, max, "WITHSCORES", "LIMIT", offset, size]) + items + end + def zrangebyscorewithscore(redis, set, min \\ "0", max \\ "+inf") do q(redis, ["ZRANGEBYSCORE", set, min, max, "WITHSCORES"]) end + def zrevrangebyscorewithlimit!(redis, set, offset, size, min \\ "0", max \\ "+inf") do + {:ok, items} = q(redis, ["ZREVRANGEBYSCORE", set, max, min, "LIMIT", offset, size]) + items + end + + def zrevrangebyscorewithscoreandlimit!(redis, set, offset, size, min \\ "0", max \\ "+inf") do + {:ok, items} = + q(redis, ["ZREVRANGEBYSCORE", set, max, min, "WITHSCORES", "LIMIT", offset, size]) + + items + end + def zrange!(redis, set, range_start \\ "0", range_end \\ "-1") do {:ok, items} = q(redis, ["ZRANGE", set, range_start, range_end]) items end + def zrem!(redis, set, members) when is_list(members) do + {:ok, res} = q(redis, ["ZREM", set | members]) + res + end + def zrem!(redis, set, member) do {:ok, res} = q(redis, ["ZREM", set, member]) res diff --git a/lib/exq/redis/job_queue.ex b/lib/exq/redis/job_queue.ex index 0d39a28..6c6d53d 100644 --- a/lib/exq/redis/job_queue.ex +++ b/lib/exq/redis/job_queue.ex @@ -20,6 +20,8 @@ defmodule Exq.Redis.JobQueue do alias Exq.Support.Config alias Exq.Support.Time + @default_size 100 + def enqueue(redis, namespace, queue, worker, args, options) do {jid, job_serialized} = to_job_serialized(queue, worker, args, options) @@ -314,25 +316,56 @@ defmodule Exq.Redis.JobQueue do for q <- queues, do: {q, jobs(redis, namespace, q)} end - def jobs(redis, namespace, queue) do - Connection.lrange!(redis, queue_key(namespace, queue)) - |> Enum.map(&Job.decode/1) + def jobs(redis, namespace, queue, options \\ []) do + range_start = Keyword.get(options, :offset, 0) + range_end = range_start + Keyword.get(options, :size, @default_size) - 1 + + Connection.lrange!(redis, queue_key(namespace, queue), range_start, range_end) + |> maybe_decode(options) end - def scheduled_jobs(redis, namespace, queue) do - Connection.zrangebyscore!(redis, full_key(namespace, queue)) - |> Enum.map(&Job.decode/1) + def scheduled_jobs(redis, namespace, queue, options \\ []) do + if Keyword.get(options, :score, false) do + scheduled_jobs_with_scores(redis, namespace, queue, options) + else + Connection.zrangebyscorewithlimit!( + redis, + full_key(namespace, queue), + Keyword.get(options, :offset, 0), + Keyword.get(options, :size, @default_size) + ) + |> maybe_decode(options) + end end - def scheduled_jobs_with_scores(redis, namespace, queue) do - Connection.zrangebyscorewithscore!(redis, full_key(namespace, queue)) - |> Enum.chunk_every(2) - |> Enum.map(fn [job, score] -> {Job.decode(job), score} end) + def scheduled_jobs_with_scores(redis, namespace, queue, options \\ []) do + Connection.zrangebyscorewithscoreandlimit!( + redis, + full_key(namespace, queue), + Keyword.get(options, :offset, 0), + Keyword.get(options, :size, @default_size) + ) + |> decode_zset_withscores(options) end - def failed(redis, namespace) do - Connection.zrange!(redis, failed_queue_key(namespace)) - |> Enum.map(&Job.decode/1) + def failed(redis, namespace, options \\ []) do + if Keyword.get(options, :score, false) do + Connection.zrevrangebyscorewithscoreandlimit!( + redis, + failed_queue_key(namespace), + Keyword.get(options, :offset, 0), + Keyword.get(options, :size, @default_size) + ) + |> decode_zset_withscores(options) + else + Connection.zrevrangebyscorewithlimit!( + redis, + failed_queue_key(namespace), + Keyword.get(options, :offset, 0), + Keyword.get(options, :size, @default_size) + ) + |> maybe_decode(options) + end end def retry_size(redis, namespace) do @@ -347,6 +380,10 @@ defmodule Exq.Redis.JobQueue do Connection.zcard!(redis, failed_queue_key(namespace)) end + def remove_enqueued_jobs(redis, namespace, queue, raw_jobs) do + Connection.lrem!(redis, queue_key(namespace, queue), raw_jobs) + end + def remove_job(redis, namespace, queue, jid) do {:ok, job} = find_job(redis, namespace, jid, queue, false) Connection.lrem!(redis, queue_key(namespace, queue), job) @@ -357,11 +394,23 @@ defmodule Exq.Redis.JobQueue do Connection.zrem!(redis, retry_queue_key(namespace), job) end + def remove_retry_jobs(redis, namespace, raw_jobs) do + Connection.zrem!(redis, retry_queue_key(namespace), raw_jobs) + end + def remove_scheduled(redis, namespace, jid) do {:ok, job} = find_job(redis, namespace, jid, :scheduled, false) Connection.zrem!(redis, scheduled_queue_key(namespace), job) end + def remove_scheduled_jobs(redis, namespace, raw_jobs) do + Connection.zrem!(redis, scheduled_queue_key(namespace), raw_jobs) + end + + def remove_failed_jobs(redis, namespace, raw_jobs) do + Connection.zrem!(redis, failed_queue_key(namespace), raw_jobs) + end + def list_queues(redis, namespace) do Connection.smembers!(redis, full_key(namespace, "queues")) end @@ -461,4 +510,25 @@ defmodule Exq.Redis.JobQueue do %{job | retried_at: timestamp} end + + defp decode_zset_withscores(list, options) do + raw? = Keyword.get(options, :raw, false) + + Enum.chunk_every(list, 2) + |> Enum.map(fn [job, score] -> + if raw? do + {job, score} + else + {Job.decode(job), score} + end + end) + end + + defp maybe_decode(list, options) do + if Keyword.get(options, :raw, false) do + list + else + Enum.map(list, &Job.decode/1) + end + end end diff --git a/lib/exq/redis/job_stat.ex b/lib/exq/redis/job_stat.ex index 5395748..351dc4a 100644 --- a/lib/exq/redis/job_stat.ex +++ b/lib/exq/redis/job_stat.ex @@ -90,6 +90,18 @@ defmodule Exq.Redis.JobStat do |> JobQueue.search_jobs(jid) end + def find_failed(redis, namespace, score, jid, options) do + find_by_score_and_jid(redis, JobQueue.full_key(namespace, "dead"), score, jid, options) + end + + def find_retry(redis, namespace, score, jid, options) do + find_by_score_and_jid(redis, JobQueue.full_key(namespace, "retry"), score, jid, options) + end + + def find_scheduled(redis, namespace, score, jid, options) do + find_by_score_and_jid(redis, JobQueue.full_key(namespace, "schedule"), score, jid, options) + end + def remove_queue(redis, namespace, queue) do Connection.qp(redis, [ ["SREM", JobQueue.full_key(namespace, "queues"), queue], @@ -162,19 +174,29 @@ defmodule Exq.Redis.JobStat do end def get_count(redis, namespace, key) do - case Connection.get!(redis, JobQueue.full_key(namespace, "stat:#{key}")) do - :undefined -> - 0 + Connection.get!(redis, JobQueue.full_key(namespace, "stat:#{key}")) + |> decode_integer() + end - nil -> - 0 + def get_counts(redis, namespace, keys) do + {:ok, results} = + Connection.q(redis, ["MGET" | Enum.map(keys, &JobQueue.full_key(namespace, "stat:#{&1}"))]) - count when is_integer(count) -> - count + Enum.map(results, &decode_integer/1) + end - count -> - {val, _} = Integer.parse(count) - val - end + def decode_integer(:undefined), do: 0 + def decode_integer(nil), do: 0 + def decode_integer(count) when is_integer(count), do: count + + def decode_integer(count) when is_binary(count) do + {count, _} = Integer.parse(count) + count + end + + defp find_by_score_and_jid(redis, zset, score, jid, options) do + redis + |> Connection.zrangebyscore!(zset, score, score) + |> JobQueue.search_jobs(jid, !Keyword.get(options, :raw, false)) end end diff --git a/test/api_test.exs b/test/api_test.exs index 4ee2ff1..48ddd4f 100644 --- a/test/api_test.exs +++ b/test/api_test.exs @@ -60,6 +60,13 @@ defmodule ApiTest do assert {:ok, 1} = Exq.Api.stats(Exq.Api, "failed") assert {:ok, 1} = Exq.Api.stats(Exq.Api, "processed") + assert {:ok, 1} = Exq.Api.stats(Exq.Api, "processed", Date.to_string(Date.utc_today())) + + assert {:ok, [1, 0]} = + Exq.Api.stats(Exq.Api, "failed", [ + Date.to_string(Date.utc_today()), + Date.to_string(Date.utc_today() |> Date.add(-1)) + ]) end test "processes when empty" do @@ -96,6 +103,12 @@ defmodule ApiTest do assert Enum.count(jobs) == 2 assert Enum.find(jobs, fn job -> job.jid == jid1 end) assert Enum.find(jobs, fn job -> job.jid == jid2 end) + + {:ok, [job]} = Exq.Api.jobs(Exq.Api, "custom", size: 1, offset: 1) + assert job.jid == jid1 + + {:ok, [json]} = Exq.Api.jobs(Exq.Api, "custom", size: 1, raw: true) + assert Jason.decode!(json)["jid"] == jid2 end test "failed when empty" do @@ -103,10 +116,14 @@ defmodule ApiTest do end test "failed with data" do - JobQueue.fail_job(:testredis, 'test', %Job{jid: "1234"}, "this is an error") + JobQueue.fail_job(:testredis, 'test', %Job{jid: "1"}, "this is an error") + JobQueue.fail_job(:testredis, 'test', %Job{jid: "2"}, "this is an error") {:ok, jobs} = Exq.Api.failed(Exq.Api) - assert Enum.count(jobs) == 1 - assert Enum.at(jobs, 0).jid == "1234" + assert Enum.count(jobs) == 2 + assert Enum.at(jobs, 0).jid == "2" + + {:ok, [json]} = Exq.Api.failed(Exq.Api, raw: true, size: 1, offset: 1) + assert Jason.decode!(json)["jid"] == "1" end test "retry when empty" do @@ -114,10 +131,14 @@ defmodule ApiTest do end test "retry with data" do - JobQueue.retry_job(:testredis, 'test', %Job{jid: "1234"}, 1, "this is an error") + JobQueue.retry_job(:testredis, "test", %Job{jid: "1"}, 1, "this is an error") + JobQueue.retry_job(:testredis, "test", %Job{jid: "2"}, 1, "this is an error") {:ok, jobs} = Exq.Api.retries(Exq.Api) - assert Enum.count(jobs) == 1 - assert Enum.at(jobs, 0).jid == "1234" + assert Enum.count(jobs) == 2 + assert Enum.at(jobs, 0).jid == "1" + + {:ok, [job]} = Exq.Api.retries(Exq.Api, size: 1, raw: true, offset: 1) + assert Jason.decode!(job)["jid"] == "2" end test "scheduled when empty" do @@ -125,10 +146,14 @@ defmodule ApiTest do end test "scheduled with data" do - {:ok, jid} = Exq.enqueue_in(Exq, 'custom', 1000, Bogus, []) + {:ok, jid1} = Exq.enqueue_in(Exq, 'custom', 1000, Bogus, []) + {:ok, jid2} = Exq.enqueue_in(Exq, 'custom', 1000, Bogus, []) {:ok, jobs} = Exq.Api.scheduled(Exq.Api) - assert Enum.count(jobs) == 1 - assert Enum.at(jobs, 0).jid == jid + assert Enum.count(jobs) == 2 + assert Enum.at(jobs, 0).jid == jid1 + + {:ok, [job]} = Exq.Api.scheduled(Exq.Api, size: 1, raw: true, offset: 1) + assert Jason.decode!(job)["jid"] == jid2 end test "scheduled with scores and data" do @@ -153,18 +178,30 @@ defmodule ApiTest do JobQueue.retry_job(:testredis, 'test', %Job{jid: "1234"}, 1, "this is an error") {:ok, job} = Exq.Api.find_retry(Exq.Api, "1234") assert job.jid == "1234" + + {:ok, [{job, score}]} = Exq.Api.retries(Exq.Api, score: true) + {:ok, job} = Exq.Api.find_retry(Exq.Api, score, job.jid) + assert job.jid == "1234" end test "find job in scheduled queue" do {:ok, jid} = Exq.enqueue_in(Exq, 'custom', 1000, Bogus, []) {:ok, job} = Exq.Api.find_scheduled(Exq.Api, jid) assert job.jid == jid + + {:ok, [{_, score}]} = Exq.Api.scheduled(Exq.Api, score: true) + {:ok, job} = Exq.Api.find_scheduled(Exq.Api, score, jid) + assert job.jid == jid end test "find job in failed queue" do JobQueue.fail_job(:testredis, 'test', %Job{jid: "1234"}, "this is an error") {:ok, job} = Exq.Api.find_failed(Exq.Api, "1234") assert job.jid == "1234" + + {:ok, [{_job, score}]} = Exq.Api.failed(Exq.Api, score: true) + {:ok, job} = Exq.Api.find_failed(Exq.Api, score, "1234") + assert job.jid == "1234" end test "remove job" do @@ -173,6 +210,14 @@ defmodule ApiTest do assert {:ok, nil} = Exq.Api.find_job(Exq.Api, 'custom', jid) end + test "remove enqueued jobs" do + {:ok, _} = Exq.enqueue(Exq, "custom", Bogus, []) + assert {:ok, 1} = Exq.Api.queue_size(Exq.Api, "custom") + {:ok, [job]} = Exq.Api.jobs(Exq.Api, "custom", raw: true) + :ok = Exq.Api.remove_enqueued_jobs(Exq.Api, "custom", [job]) + assert {:ok, 0} = Exq.Api.queue_size(Exq.Api, "custom") + end + test "remove job in retry queue" do jid = "1234" JobQueue.retry_job(:testredis, 'test', %Job{jid: "1234"}, 1, "this is an error") @@ -180,18 +225,40 @@ defmodule ApiTest do assert {:ok, nil} = Exq.Api.find_scheduled(Exq.Api, jid) end + test "remove jobs in retry queue" do + jid = "1234" + JobQueue.retry_job(:testredis, 'test', %Job{jid: "1234"}, 1, "this is an error") + {:ok, [raw_job]} = Exq.Api.retries(Exq.Api, raw: true) + Exq.Api.remove_retry_jobs(Exq.Api, [raw_job]) + assert {:ok, nil} = Exq.Api.find_scheduled(Exq.Api, jid) + end + test "remove job in scheduled queue" do {:ok, jid} = Exq.enqueue_in(Exq, 'custom', 1000, Bogus, []) Exq.Api.remove_scheduled(Exq.Api, jid) assert {:ok, nil} = Exq.Api.find_scheduled(Exq.Api, jid) end + test "remove jobs in scheduled queue" do + {:ok, jid} = Exq.enqueue_in(Exq, 'custom', 1000, Bogus, []) + {:ok, [raw_job]} = Exq.Api.scheduled(Exq.Api, raw: true) + Exq.Api.remove_scheduled_jobs(Exq.Api, [raw_job]) + assert {:ok, nil} = Exq.Api.find_scheduled(Exq.Api, jid) + end + test "remove job in failed queue" do JobQueue.fail_job(:testredis, 'test', %Job{jid: "1234"}, "this is an error") Exq.Api.remove_failed(Exq.Api, "1234") {:ok, nil} = Exq.Api.find_failed(Exq.Api, "1234") end + test "remove jobs in failed queue" do + JobQueue.fail_job(:testredis, 'test', %Job{jid: "1234"}, "this is an error") + {:ok, [raw_job]} = Exq.Api.failed(Exq.Api, raw: true) + Exq.Api.remove_failed_jobs(Exq.Api, [raw_job]) + {:ok, nil} = Exq.Api.find_failed(Exq.Api, "1234") + end + test "clear job queue" do {:ok, jid} = Exq.enqueue(Exq, 'custom', Bogus, []) Exq.Api.remove_queue(Exq.Api, 'custom')