Skip to content

Commit

Permalink
update mock module to make use of enqueue_all
Browse files Browse the repository at this point in the history
  • Loading branch information
ananthakumaran committed Jan 25, 2023
1 parent 90c8ff9 commit 451629f
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 28 deletions.
89 changes: 62 additions & 27 deletions lib/exq/mock.ex
Expand Up @@ -98,47 +98,55 @@ defmodule Exq.Mock do
runnable.()
end

@doc false
def enqueue_all(pid, jobs) do
{
:ok,
Enum.map(jobs, fn [queue, worker, args, options] ->
case options[:schedule] do
{:at, at_time} -> enqueue_at(pid, queue, at_time, worker, args, options)
{:in, offset} -> enqueue_in(pid, queue, offset, worker, args, options)
_ -> enqueue(pid, queue, worker, args, options)
end
end)
}
{:ok, runnable} = GenServer.call(__MODULE__, {:enqueue_all, self(), pid, jobs}, @timeout)
runnable.()
end

@impl true
def handle_call({:enqueue, owner_pid, type, args}, _from, state) do
state = maybe_add_and_monitor_pid(state, owner_pid, state.default_mode)
{state, runnable} = to_runnable(owner_pid, type, args, state)
{:reply, {:ok, runnable}, state}
end

case state.modes[owner_pid] do
:redis ->
runnable = fn -> apply(Redis, type, args) end
{:reply, {:ok, runnable}, state}
@impl true
def handle_call({:enqueue_all, owner_pid, pid, jobs}, _from, state) do
state = maybe_add_and_monitor_pid(state, owner_pid, state.default_mode)

:inline ->
runnable = fn ->
job = to_job(type, args)
apply(Coercion.to_module(job.class), :perform, job.args)
{:ok, job.jid}
end
{state, runnable} =
if state.modes[owner_pid] == :redis do
to_runnable(owner_pid, :enqueue_all, [pid, jobs], state)
else
{state, runnables} =
Enum.reduce(jobs, {state, []}, fn [queue, worker, args, options], {state, runnables} ->
{type, args} =
case options[:schedule] do
{:at, at_time} ->
{:enqueue_at, [pid, queue, at_time, worker, args, options]}

{:reply, {:ok, runnable}, state}
{:in, offset} ->
{:enqueue_in, [pid, queue, offset, worker, args, options]}

:fake ->
job = to_job(type, args)
state = update_in(state.jobs[owner_pid], &((&1 || []) ++ [job]))
_ ->
{:enqueue, [pid, queue, worker, args, options]}
end

{state, runnable} = to_runnable(owner_pid, type, args, state)
{state, [runnable | runnables]}
end)

runnables = Enum.reverse(runnables)

runnable = fn ->
{:ok, job.jid}
{:ok, Enum.map(runnables, fn f -> f.() end)}
end

{:reply, {:ok, runnable}, state}
end
{state, runnable}
end

{:reply, {:ok, runnable}, state}
end

def handle_call({:mode, owner_pid, mode}, _from, state) do
Expand All @@ -158,6 +166,33 @@ defmodule Exq.Mock do
{:noreply, state}
end

defp to_runnable(owner_pid, type, args, state) do
case state.modes[owner_pid] do
:redis ->
runnable = fn -> apply(Redis, type, args) end
{state, runnable}

:inline ->
runnable = fn ->
job = to_job(type, args)
apply(Coercion.to_module(job.class), :perform, job.args)
{:ok, job.jid}
end

{state, runnable}

:fake ->
job = to_job(type, args)
state = update_in(state.jobs[owner_pid], &((&1 || []) ++ [job]))

runnable = fn ->
{:ok, job.jid}
end

{state, runnable}
end
end

defp to_job(_, [_pid, queue, worker, args, options]) do
%Job{
jid: Keyword.get_lazy(options, :jid, fn -> UUID.uuid4() end),
Expand Down
1 change: 0 additions & 1 deletion test/exq_test.exs
Expand Up @@ -141,7 +141,6 @@ defmodule ExqTest do
test "enqueue_all and run many jobs" do
Process.register(self(), :exqtest)
{:ok, sup} = Exq.start_link(scheduler_enable: true)
{:ok, _} = Exq.enqueue_at(Exq, "default", DateTime.utc_now(), ExqTest.PerformWorker, [])

{:ok, [{:ok, _}, {:ok, _}, {:ok, _}]} =
Exq.enqueue_all(Exq, [
Expand Down

0 comments on commit 451629f

Please sign in to comment.