Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
11 changed files
with
335 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
defmodule Exq.Adapters.Queue do | ||
@moduledoc ~S""" | ||
Behaviour for creating Exq queue adapters | ||
## Example | ||
defmodule Exq.Adapters.Queue.CustomAdapter do | ||
@behaviour Exq.Adapters.Queue | ||
def enqueue(pid, queue, worker, args, options) do | ||
{:ok, apply(worker, :perform, args)} | ||
end | ||
def enqueue_at(pid, queue, time, worker, args, options) do | ||
enqueue_somehow(pid, queue, time, worker, args, options) | ||
end | ||
def enqueue_in(pid, queue, offset, worker, args, options) do | ||
enqueue_in_somehow(pid, queue, offset, worker, args, options) | ||
end | ||
end | ||
""" | ||
|
||
@typedoc "The GenServer name" | ||
@type name :: atom | {:global, term} | {:via, module, term} | ||
|
||
@typedoc "The server reference" | ||
@type server :: pid | name | {atom, node} | ||
|
||
@callback enqueue(server, String.t(), module(), list(), list()) :: tuple() | ||
@callback enqueue_at(server, String.t(), DateTime.t(), module(), list(), list()) :: tuple() | ||
@callback enqueue_in(server, String.t(), integer(), module(), list(), list()) :: tuple() | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
defmodule Exq.Adapters.Queue.Mock do | ||
@moduledoc """ | ||
Mock queue. Designed to be used when testing your application. | ||
""" | ||
|
||
@behaviour Exq.Adapters.Queue | ||
|
||
defdelegate enqueue(pid, queue, worker, args, options), to: Exq.Mock | ||
|
||
defdelegate enqueue_at(pid, queue, time, worker, args, options), to: Exq.Mock | ||
|
||
defdelegate enqueue_in(pid, queue, offset, worker, args, options), to: Exq.Mock | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
defmodule Exq.Adapters.Queue.Redis do | ||
@moduledoc """ | ||
Redis based Asynchronous queue. Enqueue the job by using the GenServer API. | ||
Default queue. Designed to be used in production. | ||
""" | ||
alias Exq.Support.Config | ||
alias Exq.Redis.JobQueue | ||
|
||
@behaviour Exq.Adapters.Queue | ||
|
||
def enqueue(pid, queue, worker, args, options) do | ||
{redis, namespace} = GenServer.call(pid, :redis, Config.get(:genserver_timeout)) | ||
JobQueue.enqueue(redis, namespace, queue, worker, args, options) | ||
end | ||
|
||
def enqueue_at(pid, queue, time, worker, args, options) do | ||
{redis, namespace} = GenServer.call(pid, :redis, Config.get(:genserver_timeout)) | ||
JobQueue.enqueue_at(redis, namespace, queue, time, worker, args, options) | ||
end | ||
|
||
def enqueue_in(pid, queue, offset, worker, args, options) do | ||
{redis, namespace} = GenServer.call(pid, :redis, Config.get(:genserver_timeout)) | ||
JobQueue.enqueue_in(redis, namespace, queue, offset, worker, args, options) | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
defmodule Exq.Mock do | ||
alias Exq.Support.Config | ||
alias Exq.Adapters.Queue.Redis | ||
alias Exq.Support.Job | ||
use GenServer | ||
@timeout 30000 | ||
|
||
defmodule State do | ||
defstruct default_mode: :redis, jobs: %{}, modes: %{} | ||
end | ||
|
||
### Public api | ||
|
||
def start_link(options \\ []) do | ||
queue_adapter = Config.get(:queue_adapter) | ||
|
||
if queue_adapter != Exq.Adapters.Queue.Mock do | ||
raise RuntimeError, """ | ||
Exq.Mock can only work if queue_adapter is set to Exq.Adapters.Queue.Mock | ||
Add the following to your test config | ||
config :exq, queue_adapter: Exq.Adapters.Queue.Mock | ||
""" | ||
end | ||
|
||
GenServer.start_link(__MODULE__, options, name: __MODULE__) | ||
end | ||
|
||
def set_mode(mode) when mode in [:redis, :inline, :fake] do | ||
GenServer.call(__MODULE__, {:mode, self(), mode}, @timeout) | ||
end | ||
|
||
def jobs do | ||
GenServer.call(__MODULE__, {:jobs, self()}, @timeout) | ||
end | ||
|
||
### Private | ||
|
||
@impl true | ||
def init(options) do | ||
{:ok, %State{default_mode: Keyword.get(options, :mode, :redis)}} | ||
end | ||
|
||
def enqueue(pid, queue, worker, args, options) do | ||
{:ok, runnable} = | ||
GenServer.call( | ||
__MODULE__, | ||
{:enqueue, self(), :enqueue, [pid, queue, worker, args, options]}, | ||
@timeout | ||
) | ||
|
||
runnable.() | ||
end | ||
|
||
def enqueue_at(pid, queue, time, worker, args, options) do | ||
{:ok, runnable} = | ||
GenServer.call( | ||
__MODULE__, | ||
{:enqueue, self(), :enqueue_at, [pid, queue, time, worker, args, options]}, | ||
@timeout | ||
) | ||
|
||
runnable.() | ||
end | ||
|
||
def enqueue_in(pid, queue, offset, worker, args, options) do | ||
{:ok, runnable} = | ||
GenServer.call( | ||
__MODULE__, | ||
{:enqueue, self(), :enqueue_in, [pid, queue, offset, worker, args, options]}, | ||
@timeout | ||
) | ||
|
||
runnable.() | ||
end | ||
|
||
@impl true | ||
def handle_call({:enqueue, owner_pid, type, args}, _from, state) do | ||
state = maybe_add_and_monitor_pid(state, owner_pid, state.default_mode) | ||
|
||
case state.modes[owner_pid] do | ||
:redis -> | ||
runnable = fn -> apply(Redis, type, args) end | ||
{:reply, {:ok, runnable}, state} | ||
|
||
:inline -> | ||
runnable = fn -> | ||
job = to_job(args) | ||
apply(job.class, :perform, job.args) | ||
{:ok, job.jid} | ||
end | ||
|
||
{:reply, {:ok, runnable}, state} | ||
|
||
:fake -> | ||
job = to_job(args) | ||
state = update_in(state.jobs[owner_pid], &((&1 || []) ++ [job])) | ||
|
||
runnable = fn -> | ||
{:ok, job.jid} | ||
end | ||
|
||
{:reply, {:ok, runnable}, state} | ||
end | ||
end | ||
|
||
def handle_call({:mode, owner_pid, mode}, _from, state) do | ||
state = maybe_add_and_monitor_pid(state, owner_pid, mode) | ||
{:reply, :ok, state} | ||
end | ||
|
||
def handle_call({:jobs, owner_pid}, _from, state) do | ||
jobs = state.jobs[owner_pid] || [] | ||
{:reply, jobs, state} | ||
end | ||
|
||
@impl true | ||
def handle_info({:DOWN, _, _, pid, _}, state) do | ||
{_, state} = pop_in(state.modes[pid]) | ||
{_, state} = pop_in(state.jobs[pid]) | ||
{:noreply, state} | ||
end | ||
|
||
defp to_job([_pid, queue, worker, args, _options]) do | ||
%Job{ | ||
jid: UUID.uuid4(), | ||
queue: queue, | ||
class: worker, | ||
args: args, | ||
enqueued_at: DateTime.utc_now() | ||
} | ||
end | ||
|
||
defp to_job([_pid, queue, _time_or_offset, worker, args, _options]) do | ||
%Job{ | ||
jid: UUID.uuid4(), | ||
queue: queue, | ||
class: worker, | ||
args: args, | ||
enqueued_at: DateTime.utc_now() | ||
} | ||
end | ||
|
||
defp maybe_add_and_monitor_pid(state, pid, mode) do | ||
case state.modes do | ||
%{^pid => _mode} -> | ||
state | ||
|
||
_ -> | ||
Process.monitor(pid) | ||
state = put_in(state.modes[pid], mode) | ||
state | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
defmodule FakeModeTest do | ||
use ExUnit.Case, async: true | ||
|
||
defmodule BrokenWorker do | ||
def perform(_) do | ||
raise RuntimeError, "Unexpected" | ||
end | ||
end | ||
|
||
setup do | ||
Exq.Mock.set_mode(:fake) | ||
end | ||
|
||
describe "fake mode" do | ||
test "enqueue should " do | ||
assert [] = Exq.Mock.jobs() | ||
assert {:ok, _} = Exq.enqueue(Exq, "low", BrokenWorker, [1]) | ||
assert {:ok, _} = Exq.enqueue_at(Exq, "low", DateTime.utc_now(), BrokenWorker, [2]) | ||
assert {:ok, _} = Exq.enqueue_in(Exq, "low", 300, BrokenWorker, [3]) | ||
|
||
assert [ | ||
%Exq.Support.Job{ | ||
args: [1], | ||
class: FakeModeTest.BrokenWorker, | ||
queue: "low" | ||
}, | ||
%Exq.Support.Job{ | ||
args: [2], | ||
class: FakeModeTest.BrokenWorker, | ||
queue: "low" | ||
}, | ||
%Exq.Support.Job{ | ||
args: [3], | ||
class: FakeModeTest.BrokenWorker, | ||
queue: "low" | ||
} | ||
] = Exq.Mock.jobs() | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
defmodule InlineModeTest do | ||
use ExUnit.Case, async: true | ||
|
||
defmodule EchoWorker do | ||
def perform(value), do: value | ||
end | ||
|
||
setup do | ||
Exq.Mock.set_mode(:inline) | ||
end | ||
|
||
describe "inline mode" do | ||
test "enqueue should return the correct value" do | ||
assert {:ok, _} = Exq.enqueue(Exq, "low", EchoWorker, [1]) | ||
end | ||
|
||
test "enqueue_at should return the correct value" do | ||
assert {:ok, _} = Exq.enqueue_at(Exq, "low", DateTime.utc_now(), EchoWorker, [1]) | ||
end | ||
|
||
test "enqueue_in should return the correct value" do | ||
assert {:ok, _} = Exq.enqueue_in(Exq, "low", 300, EchoWorker, [1]) | ||
end | ||
end | ||
end |
Oops, something went wrong.