From 543d6262dd4b33fa98c28726b61f1bd4a6eda9c0 Mon Sep 17 00:00:00 2001 From: samidarko Date: Tue, 16 Apr 2019 12:57:25 +0800 Subject: [PATCH 1/8] feat: queue adapter --- lib/exq/adapters/queue.ex | 10 +++++++ lib/exq/adapters/queue/gen_server.ex | 42 ++++++++++++++++++++++++++++ lib/exq/adapters/queue/test.ex | 29 +++++++++++++++++++ lib/exq/enqueue_api.ex | 30 ++++++++------------ lib/exq/support/config.ex | 3 +- test/adapters_test.ex | 35 +++++++++++++++++++++++ 6 files changed, 130 insertions(+), 19 deletions(-) create mode 100644 lib/exq/adapters/queue.ex create mode 100644 lib/exq/adapters/queue/gen_server.ex create mode 100644 lib/exq/adapters/queue/test.ex create mode 100644 test/adapters_test.ex diff --git a/lib/exq/adapters/queue.ex b/lib/exq/adapters/queue.ex new file mode 100644 index 00000000..72bb491e --- /dev/null +++ b/lib/exq/adapters/queue.ex @@ -0,0 +1,10 @@ +defmodule Exq.Adapters.Queue do + @moduledoc false + + @callback enqueue(any(), String.t(), module(), list(), list()) :: tuple() + @callback enqueue(any(), any(), String.t(), module(), list(), list()) :: tuple() + @callback enqueue_at(any(), String.t(), any(), module(), list(), list()) :: tuple() + @callback enqueue_at(any(), any(), String.t(), any(), module(), list(), list()) :: tuple() + @callback enqueue_in(any(), String.t(), any(), module(), list(), list()) :: tuple() + @callback enqueue_in(any(), any(), String.t(), any(), module(), list(), list()) :: tuple() +end diff --git a/lib/exq/adapters/queue/gen_server.ex b/lib/exq/adapters/queue/gen_server.ex new file mode 100644 index 00000000..8e5c5ecf --- /dev/null +++ b/lib/exq/adapters/queue/gen_server.ex @@ -0,0 +1,42 @@ +defmodule Exq.Adapters.Queue.GenServer do + @moduledoc false + alias Exq.Support.Config + + @behaviour Exq.Adapters.Queue + + def enqueue(pid, queue, worker, args, options) do + GenServer.call( + pid, + {:enqueue, queue, worker, args, options}, + Config.get(:genserver_timeout) + ) + end + + def enqueue(pid, from, queue, worker, args, options) do + GenServer.cast(pid, {:enqueue, from, queue, worker, args, options}) + end + + def enqueue_at(pid, queue, time, worker, args, options) do + GenServer.call( + pid, + {:enqueue_at, queue, time, worker, args, options}, + Config.get(:genserver_timeout) + ) + end + + def enqueue_at(pid, from, queue, time, worker, args, options) do + GenServer.cast(pid, {:enqueue_at, from, queue, time, worker, args, options}) + end + + def enqueue_in(pid, queue, offset, worker, args, options) do + GenServer.call( + pid, + {:enqueue_in, queue, offset, worker, args, options}, + Config.get(:genserver_timeout) + ) + end + + def enqueue_in(pid, from, queue, offset, worker, args, options) do + GenServer.cast(pid, {:enqueue_in, from, queue, offset, worker, args, options}) + end +end diff --git a/lib/exq/adapters/queue/test.ex b/lib/exq/adapters/queue/test.ex new file mode 100644 index 00000000..c679654d --- /dev/null +++ b/lib/exq/adapters/queue/test.ex @@ -0,0 +1,29 @@ +defmodule Exq.Adapters.Queue.Test do + @moduledoc false + + @behaviour Exq.Adapters.Queue + + def enqueue(_pid, _queue, worker, args, _options) do + {:ok, apply(worker, :perform, args)} + end + + def enqueue(pid, _from, queue, worker, args, options) do + enqueue(pid, queue, worker, args, options) + end + + def enqueue_at(pid, queue, _time, worker, args, options) do + enqueue(pid, queue, worker, args, options) + end + + def enqueue_at(pid, _from, queue, _time, worker, args, options) do + enqueue(pid, queue, worker, args, options) + end + + def enqueue_in(pid, queue, _offset, worker, args, options) do + enqueue(pid, queue, worker, args, options) + end + + def enqueue_in(pid, _from, queue, _offset, worker, args, options) do + enqueue(pid, queue, worker, args, options) + end +end diff --git a/lib/exq/enqueue_api.ex b/lib/exq/enqueue_api.ex index 6cbe4888..8bcb7bd2 100644 --- a/lib/exq/enqueue_api.ex +++ b/lib/exq/enqueue_api.ex @@ -33,15 +33,13 @@ defmodule Exq.Enqueuer.EnqueueApi do end def enqueue(pid, queue, worker, args, options) do - GenServer.call( - pid, - {:enqueue, queue, worker, args, options}, - Config.get(:genserver_timeout) - ) + queue_adapter = Config.get(:queue_adapter) + queue_adapter.enqueue(pid, queue, worker, args, options) end def enqueue(pid, from, queue, worker, args, options) do - GenServer.cast(pid, {:enqueue, from, queue, worker, args, options}) + queue_adapter = Config.get(:queue_adapter) + queue_adapter.enqueue(pid, from, queue, worker, args, options) end @doc """ @@ -64,15 +62,13 @@ defmodule Exq.Enqueuer.EnqueueApi do end def enqueue_at(pid, queue, time, worker, args, options) do - GenServer.call( - pid, - {:enqueue_at, queue, time, worker, args, options}, - Config.get(:genserver_timeout) - ) + queue_adapter = Config.get(:queue_adapter) + queue_adapter.enqueue_at(pid, queue, time, worker, args, options) end def enqueue_at(pid, from, queue, time, worker, args, options) do - GenServer.cast(pid, {:enqueue_at, from, queue, time, worker, args, options}) + queue_adapter = Config.get(:queue_adapter) + queue_adapter.enqueue_at(pid, from, queue, time, worker, args, options) end @doc """ @@ -95,15 +91,13 @@ defmodule Exq.Enqueuer.EnqueueApi do end def enqueue_in(pid, queue, offset, worker, args, options) do - GenServer.call( - pid, - {:enqueue_in, queue, offset, worker, args, options}, - Config.get(:genserver_timeout) - ) + queue_adapter = Config.get(:queue_adapter) + queue_adapter.enqueue_in(pid, queue, offset, worker, args, options) end def enqueue_in(pid, from, queue, offset, worker, args, options) do - GenServer.cast(pid, {:enqueue_in, from, queue, offset, worker, args, options}) + queue_adapter = Config.get(:queue_adapter) + queue_adapter.enqueue_in(pid, from, queue, offset, worker, args, options) end end end diff --git a/lib/exq/support/config.ex b/lib/exq/support/config.ex index 18ab03d0..6ce018b1 100644 --- a/lib/exq/support/config.ex +++ b/lib/exq/support/config.ex @@ -30,7 +30,8 @@ defmodule Exq.Support.Config do Exq.Middleware.Job, Exq.Middleware.Manager, Exq.Middleware.Logger - ] + ], + queue_adapter: Exq.Adapters.Queue.GenServer } def get(key) do diff --git a/test/adapters_test.ex b/test/adapters_test.ex new file mode 100644 index 00000000..b95410ea --- /dev/null +++ b/test/adapters_test.ex @@ -0,0 +1,35 @@ +defmodule AdaptersTest do + use ExUnit.Case + + alias Exq.Adapters.Queue.Test, as: QueueTest + + defmodule EchoWorker do + def perform(value), do: value + end + + describe "Exq.Adapters.Queue.Test" do + test "enqueue/5 should return the correct value" do + assert {:ok, 1} == QueueTest.enqueue(nil, nil, EchoWorker, [1], nil) + end + + test "enqueue/6 should return the correct value" do + assert {:ok, 2} == QueueTest.enqueue(nil, nil, nil, EchoWorker, [2], nil) + end + + test "enqueue_at/6 should return the correct value" do + assert {:ok, 3} == QueueTest.enqueue_at(nil, nil, nil, EchoWorker, [3], nil) + end + + test "enqueue_at/7 should return the correct value" do + assert {:ok, 4} == QueueTest.enqueue_at(nil, nil, nil, nil, EchoWorker, [4], nil) + end + + test "enqueue_in/6 should return the correct value" do + assert {:ok, 5} == QueueTest.enqueue_in(nil, nil, nil, EchoWorker, [5], nil) + end + + test "enqueue_in/7 should return the correct value" do + assert {:ok, 6} == QueueTest.enqueue_in(nil, nil, nil, nil, EchoWorker, [6], nil) + end + end +end From df8edc47662b23da30345d209af57576c5f4189f Mon Sep 17 00:00:00 2001 From: samidarko Date: Tue, 16 Apr 2019 13:37:53 +0800 Subject: [PATCH 2/8] test: rename adapter_test.ex to adapter_test.exs --- test/{adapters_test.ex => adapters_test.exs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename test/{adapters_test.ex => adapters_test.exs} (100%) diff --git a/test/adapters_test.ex b/test/adapters_test.exs similarity index 100% rename from test/adapters_test.ex rename to test/adapters_test.exs From 7fa8717cc4864f0cae663d50806d899b24a3ce18 Mon Sep 17 00:00:00 2001 From: samidarko Date: Tue, 7 May 2019 13:57:51 +0800 Subject: [PATCH 3/8] refactor: renamed Test queue to Fake and GenServer to Async --- lib/exq/adapters/queue/{gen_server.ex => async.ex} | 2 +- lib/exq/adapters/queue/{test.ex => fake.ex} | 2 +- lib/exq/support/config.ex | 2 +- test/adapters_test.exs | 14 +++++++------- 4 files changed, 10 insertions(+), 10 deletions(-) rename lib/exq/adapters/queue/{gen_server.ex => async.ex} (96%) rename lib/exq/adapters/queue/{test.ex => fake.ex} (95%) diff --git a/lib/exq/adapters/queue/gen_server.ex b/lib/exq/adapters/queue/async.ex similarity index 96% rename from lib/exq/adapters/queue/gen_server.ex rename to lib/exq/adapters/queue/async.ex index 8e5c5ecf..5f7279bc 100644 --- a/lib/exq/adapters/queue/gen_server.ex +++ b/lib/exq/adapters/queue/async.ex @@ -1,4 +1,4 @@ -defmodule Exq.Adapters.Queue.GenServer do +defmodule Exq.Adapters.Queue.Async do @moduledoc false alias Exq.Support.Config diff --git a/lib/exq/adapters/queue/test.ex b/lib/exq/adapters/queue/fake.ex similarity index 95% rename from lib/exq/adapters/queue/test.ex rename to lib/exq/adapters/queue/fake.ex index c679654d..4ecf6e63 100644 --- a/lib/exq/adapters/queue/test.ex +++ b/lib/exq/adapters/queue/fake.ex @@ -1,4 +1,4 @@ -defmodule Exq.Adapters.Queue.Test do +defmodule Exq.Adapters.Queue.Fake do @moduledoc false @behaviour Exq.Adapters.Queue diff --git a/lib/exq/support/config.ex b/lib/exq/support/config.ex index 6ce018b1..f30c574b 100644 --- a/lib/exq/support/config.ex +++ b/lib/exq/support/config.ex @@ -31,7 +31,7 @@ defmodule Exq.Support.Config do Exq.Middleware.Manager, Exq.Middleware.Logger ], - queue_adapter: Exq.Adapters.Queue.GenServer + queue_adapter: Exq.Adapters.Queue.Async } def get(key) do diff --git a/test/adapters_test.exs b/test/adapters_test.exs index b95410ea..9ef62b3d 100644 --- a/test/adapters_test.exs +++ b/test/adapters_test.exs @@ -1,7 +1,7 @@ defmodule AdaptersTest do use ExUnit.Case - alias Exq.Adapters.Queue.Test, as: QueueTest + alias Exq.Adapters.Queue.Fake defmodule EchoWorker do def perform(value), do: value @@ -9,27 +9,27 @@ defmodule AdaptersTest do describe "Exq.Adapters.Queue.Test" do test "enqueue/5 should return the correct value" do - assert {:ok, 1} == QueueTest.enqueue(nil, nil, EchoWorker, [1], nil) + assert {:ok, 1} == Fake.enqueue(nil, nil, EchoWorker, [1], nil) end test "enqueue/6 should return the correct value" do - assert {:ok, 2} == QueueTest.enqueue(nil, nil, nil, EchoWorker, [2], nil) + assert {:ok, 2} == Fake.enqueue(nil, nil, nil, EchoWorker, [2], nil) end test "enqueue_at/6 should return the correct value" do - assert {:ok, 3} == QueueTest.enqueue_at(nil, nil, nil, EchoWorker, [3], nil) + assert {:ok, 3} == Fake.enqueue_at(nil, nil, nil, EchoWorker, [3], nil) end test "enqueue_at/7 should return the correct value" do - assert {:ok, 4} == QueueTest.enqueue_at(nil, nil, nil, nil, EchoWorker, [4], nil) + assert {:ok, 4} == Fake.enqueue_at(nil, nil, nil, nil, EchoWorker, [4], nil) end test "enqueue_in/6 should return the correct value" do - assert {:ok, 5} == QueueTest.enqueue_in(nil, nil, nil, EchoWorker, [5], nil) + assert {:ok, 5} == Fake.enqueue_in(nil, nil, nil, EchoWorker, [5], nil) end test "enqueue_in/7 should return the correct value" do - assert {:ok, 6} == QueueTest.enqueue_in(nil, nil, nil, nil, EchoWorker, [6], nil) + assert {:ok, 6} == Fake.enqueue_in(nil, nil, nil, nil, EchoWorker, [6], nil) end end end From 5e2891c4100985f743b7a7338386e2f70a1b5d7f Mon Sep 17 00:00:00 2001 From: samidarko Date: Sat, 11 May 2019 17:28:06 +0800 Subject: [PATCH 4/8] doc: queue behaviour and concrete implementation + typespecs --- lib/exq/adapters/queue.ex | 53 ++++++++++++++++++++++++++++----- lib/exq/adapters/queue/async.ex | 6 +++- lib/exq/adapters/queue/fake.ex | 6 +++- test/config_test.exs | 1 - test/worker_test.exs | 4 +-- 5 files changed, 57 insertions(+), 13 deletions(-) diff --git a/lib/exq/adapters/queue.ex b/lib/exq/adapters/queue.ex index 72bb491e..dd60d184 100644 --- a/lib/exq/adapters/queue.ex +++ b/lib/exq/adapters/queue.ex @@ -1,10 +1,47 @@ defmodule Exq.Adapters.Queue do - @moduledoc false - - @callback enqueue(any(), String.t(), module(), list(), list()) :: tuple() - @callback enqueue(any(), any(), String.t(), module(), list(), list()) :: tuple() - @callback enqueue_at(any(), String.t(), any(), module(), list(), list()) :: tuple() - @callback enqueue_at(any(), any(), String.t(), any(), module(), list(), list()) :: tuple() - @callback enqueue_in(any(), String.t(), any(), module(), list(), list()) :: tuple() - @callback enqueue_in(any(), any(), String.t(), any(), module(), list(), list()) :: tuple() + @moduledoc ~S""" + Behaviour for creating Exq queue adapters + + ## Example + defmodule Exq.Adapters.Queue.CustomAdapter do + @behaviour Exq.Adapters.Queue + def enqueue(pid, queue, worker, args, options) do + {:ok, apply(worker, :perform, args)} + end + + def enqueue(pid, from, queue, worker, args, options) do + enqueue_somehow(pid, from, queue, worker, args, options) + end + + def enqueue_at(pid, queue, time, worker, args, options) do + enqueue_somehow(pid, queue, time, worker, args, options) + end + + def enqueue_at(pid, from, queue, time, worker, args, options) do + enqueue_at_somehow(pid, from, queue, time, worker, args, options) + end + + def enqueue_in(pid, queue, offset, worker, args, options) do + enqueue_in_somehow(pid, queue, offset, worker, args, options) + end + + def enqueue_in(pid, from, queue, offset, worker, args, options) do + enqueue_in_somehow(pid, from, queue, offset, worker, args, options) + end + end + """ + + @typedoc "The GenServer name" + @type name :: atom | {:global, term} | {:via, module, term} + + @typedoc "The server reference" + @type server :: pid | name | {atom, node} + + @callback enqueue(server, String.t(), module(), list(), list()) :: tuple() + @callback enqueue(server, pid(), String.t(), module(), list(), list()) :: tuple() + @callback enqueue_at(server, String.t(), DateTime.t(), module(), list(), list()) :: tuple() + @callback enqueue_at(server, pid(), String.t(), DateTime.t(), module(), list(), list()) :: + tuple() + @callback enqueue_in(server, String.t(), integer(), module(), list(), list()) :: tuple() + @callback enqueue_in(server, pid(), String.t(), integer(), module(), list(), list()) :: tuple() end diff --git a/lib/exq/adapters/queue/async.ex b/lib/exq/adapters/queue/async.ex index 5f7279bc..23962722 100644 --- a/lib/exq/adapters/queue/async.ex +++ b/lib/exq/adapters/queue/async.ex @@ -1,5 +1,9 @@ defmodule Exq.Adapters.Queue.Async do - @moduledoc false + @moduledoc """ + Asynchronous queue. Enqueue the job by using the GenServer API. + + Default queue. Designed to be used in production. + """ alias Exq.Support.Config @behaviour Exq.Adapters.Queue diff --git a/lib/exq/adapters/queue/fake.ex b/lib/exq/adapters/queue/fake.ex index 4ecf6e63..aa7e4c43 100644 --- a/lib/exq/adapters/queue/fake.ex +++ b/lib/exq/adapters/queue/fake.ex @@ -1,5 +1,9 @@ defmodule Exq.Adapters.Queue.Fake do - @moduledoc false + @moduledoc """ + Fake queue. Do not enqueue the job but execute it immediately. + + Designed to be used when testing your application. + """ @behaviour Exq.Adapters.Queue diff --git a/test/config_test.exs b/test/config_test.exs index 93612c64..dbff39ac 100644 --- a/test/config_test.exs +++ b/test/config_test.exs @@ -1,7 +1,6 @@ defmodule Exq.ConfigTest do use ExUnit.Case require Mix.Config - import ExqTestUtil setup_all do ExqTestUtil.reset_config() diff --git a/test/worker_test.exs b/test/worker_test.exs index 526702d1..1377775d 100644 --- a/test/worker_test.exs +++ b/test/worker_test.exs @@ -111,7 +111,7 @@ defmodule WorkerTest do def connected( :cast, {:pipeline, [["ZADD" | _], ["ZREMRANGEBYSCORE" | _], ["ZREMRANGEBYRANK" | _]], from, - timeout}, + _timeout}, data ) do send(:workertest, :zadd_redis) @@ -120,7 +120,7 @@ defmodule WorkerTest do end # Same reply as Redix connection - def connected(:cast, {:pipeline, [["LREM" | _]], from, timeout}, data) do + def connected(:cast, {:pipeline, [["LREM" | _]], from, _timeout}, data) do send(:workertest, :lrem_redis) reply(from, {:ok, [1]}) {:keep_state, data} From f21104c5abf219b21597fc256c730b75ccd3cd92 Mon Sep 17 00:00:00 2001 From: Anantha Kumaran Date: Sun, 3 Nov 2019 12:29:39 +0530 Subject: [PATCH 5/8] rename adapter names * add mock api which should be able to support multiple modes with `async: true` enabled --- config/test.exs | 3 +- lib/exq/adapters/queue/fake.ex | 33 ----- lib/exq/adapters/queue/mock.ex | 13 ++ lib/exq/adapters/queue/{async.ex => redis.ex} | 4 +- lib/exq/mock.ex | 115 ++++++++++++++++++ lib/exq/support/config.ex | 2 +- test/adapters_test.exs | 35 ------ test/inline_adapter_test.exs | 25 ++++ test/test_helper.exs | 2 + 9 files changed, 160 insertions(+), 72 deletions(-) delete mode 100644 lib/exq/adapters/queue/fake.ex create mode 100644 lib/exq/adapters/queue/mock.ex rename lib/exq/adapters/queue/{async.ex => redis.ex} (87%) create mode 100644 lib/exq/mock.ex delete mode 100644 test/adapters_test.exs create mode 100644 test/inline_adapter_test.exs diff --git a/config/test.exs b/config/test.exs index ac7e9262..366576b5 100644 --- a/config/test.exs +++ b/config/test.exs @@ -19,4 +19,5 @@ config :exq, max_retries: 0, stats_flush_interval: 5, stats_batch_size: 1, - middleware: [Exq.Middleware.Stats, Exq.Middleware.Job, Exq.Middleware.Manager] + middleware: [Exq.Middleware.Stats, Exq.Middleware.Job, Exq.Middleware.Manager], + queue_adapter: Exq.Adapters.Queue.Mock diff --git a/lib/exq/adapters/queue/fake.ex b/lib/exq/adapters/queue/fake.ex deleted file mode 100644 index aa7e4c43..00000000 --- a/lib/exq/adapters/queue/fake.ex +++ /dev/null @@ -1,33 +0,0 @@ -defmodule Exq.Adapters.Queue.Fake do - @moduledoc """ - Fake queue. Do not enqueue the job but execute it immediately. - - Designed to be used when testing your application. - """ - - @behaviour Exq.Adapters.Queue - - def enqueue(_pid, _queue, worker, args, _options) do - {:ok, apply(worker, :perform, args)} - end - - def enqueue(pid, _from, queue, worker, args, options) do - enqueue(pid, queue, worker, args, options) - end - - def enqueue_at(pid, queue, _time, worker, args, options) do - enqueue(pid, queue, worker, args, options) - end - - def enqueue_at(pid, _from, queue, _time, worker, args, options) do - enqueue(pid, queue, worker, args, options) - end - - def enqueue_in(pid, queue, _offset, worker, args, options) do - enqueue(pid, queue, worker, args, options) - end - - def enqueue_in(pid, _from, queue, _offset, worker, args, options) do - enqueue(pid, queue, worker, args, options) - end -end diff --git a/lib/exq/adapters/queue/mock.ex b/lib/exq/adapters/queue/mock.ex new file mode 100644 index 00000000..7fcbf196 --- /dev/null +++ b/lib/exq/adapters/queue/mock.ex @@ -0,0 +1,13 @@ +defmodule Exq.Adapters.Queue.Mock do + @moduledoc """ + Mock queue. Designed to be used when testing your application. + """ + + @behaviour Exq.Adapters.Queue + + defdelegate enqueue(pid, queue, worker, args, options), to: Exq.Mock + + defdelegate enqueue_at(pid, queue, time, worker, args, options), to: Exq.Mock + + defdelegate enqueue_in(pid, queue, offset, worker, args, options), to: Exq.Mock +end diff --git a/lib/exq/adapters/queue/async.ex b/lib/exq/adapters/queue/redis.ex similarity index 87% rename from lib/exq/adapters/queue/async.ex rename to lib/exq/adapters/queue/redis.ex index 9d14d4f1..7e048aec 100644 --- a/lib/exq/adapters/queue/async.ex +++ b/lib/exq/adapters/queue/redis.ex @@ -1,6 +1,6 @@ -defmodule Exq.Adapters.Queue.Async do +defmodule Exq.Adapters.Queue.Redis do @moduledoc """ - Asynchronous queue. Enqueue the job by using the GenServer API. + Redis based Asynchronous queue. Enqueue the job by using the GenServer API. Default queue. Designed to be used in production. """ diff --git a/lib/exq/mock.ex b/lib/exq/mock.ex new file mode 100644 index 00000000..73d7529d --- /dev/null +++ b/lib/exq/mock.ex @@ -0,0 +1,115 @@ +defmodule Exq.Mock do + alias Exq.Support.Config + alias Exq.Adapters.Queue.Redis + use GenServer + @timeout 30000 + + defmodule State do + defstruct default_mode: :redis, jobs: %{}, modes: %{} + end + + ### Public api + + def start_link(options \\ []) do + queue_adapter = Config.get(:queue_adapter) + + if queue_adapter != Exq.Adapters.Queue.Mock do + raise RuntimeError, """ + Exq.Mock can only work if queue_adapter is set to Exq.Adapters.Queue.Mock + Add the following to your test config + config :exq, queue_adapter: Exq.Adapters.Queue.Mock + """ + end + + GenServer.start_link(__MODULE__, options, name: __MODULE__) + end + + def set_mode(mode) when mode in [:redis, :inline] do + GenServer.call(__MODULE__, {:mode, self(), mode}, @timeout) + end + + ### Private + + @impl true + def init(options) do + {:ok, %State{default_mode: Keyword.get(options, :mode, :redis)}} + end + + def enqueue(pid, queue, worker, args, options) do + {:ok, runnable} = + GenServer.call( + __MODULE__, + {:enqueue, self(), :enqueue, [pid, queue, worker, args, options]}, + @timeout + ) + + runnable.() + end + + def enqueue_at(pid, queue, time, worker, args, options) do + {:ok, runnable} = + GenServer.call( + __MODULE__, + {:enqueue, self(), :enqueue_at, [pid, queue, time, worker, args, options]}, + @timeout + ) + + runnable.() + end + + def enqueue_in(pid, queue, offset, worker, args, options) do + {:ok, runnable} = + GenServer.call( + __MODULE__, + {:enqueue, self(), :enqueue_in, [pid, queue, offset, worker, args, options]}, + @timeout + ) + + runnable.() + end + + @impl true + def handle_call({:enqueue, owner_pid, type, args}, _from, state) do + state = maybe_add_and_monitor_pid(state, owner_pid) + + runnable = + case state.modes[owner_pid] do + :redis -> + fn -> apply(Redis, type, args) end + + :inline -> + fn -> + jid = UUID.uuid4() + + case args do + [_pid, _queue, worker, args, _options] -> + apply(worker, :perform, args) + + [_pid, _queue, _time_or_offset, worker, args, _options] -> + apply(worker, :perform, args) + end + + {:ok, jid} + end + end + + {:reply, {:ok, runnable}, state} + end + + def handle_call({:mode, owner_pid, mode}, _from, state) do + state = put_in(state.modes[owner_pid], mode) + {:reply, :ok, state} + end + + defp maybe_add_and_monitor_pid(state, pid) do + case state.modes do + %{^pid => _mode} -> + state + + _ -> + Process.monitor(pid) + state = put_in(state.modes[pid], state.default_mode) + state + end + end +end diff --git a/lib/exq/support/config.ex b/lib/exq/support/config.ex index f30c574b..2c7332fa 100644 --- a/lib/exq/support/config.ex +++ b/lib/exq/support/config.ex @@ -31,7 +31,7 @@ defmodule Exq.Support.Config do Exq.Middleware.Manager, Exq.Middleware.Logger ], - queue_adapter: Exq.Adapters.Queue.Async + queue_adapter: Exq.Adapters.Queue.Redis } def get(key) do diff --git a/test/adapters_test.exs b/test/adapters_test.exs deleted file mode 100644 index 9ef62b3d..00000000 --- a/test/adapters_test.exs +++ /dev/null @@ -1,35 +0,0 @@ -defmodule AdaptersTest do - use ExUnit.Case - - alias Exq.Adapters.Queue.Fake - - defmodule EchoWorker do - def perform(value), do: value - end - - describe "Exq.Adapters.Queue.Test" do - test "enqueue/5 should return the correct value" do - assert {:ok, 1} == Fake.enqueue(nil, nil, EchoWorker, [1], nil) - end - - test "enqueue/6 should return the correct value" do - assert {:ok, 2} == Fake.enqueue(nil, nil, nil, EchoWorker, [2], nil) - end - - test "enqueue_at/6 should return the correct value" do - assert {:ok, 3} == Fake.enqueue_at(nil, nil, nil, EchoWorker, [3], nil) - end - - test "enqueue_at/7 should return the correct value" do - assert {:ok, 4} == Fake.enqueue_at(nil, nil, nil, nil, EchoWorker, [4], nil) - end - - test "enqueue_in/6 should return the correct value" do - assert {:ok, 5} == Fake.enqueue_in(nil, nil, nil, EchoWorker, [5], nil) - end - - test "enqueue_in/7 should return the correct value" do - assert {:ok, 6} == Fake.enqueue_in(nil, nil, nil, nil, EchoWorker, [6], nil) - end - end -end diff --git a/test/inline_adapter_test.exs b/test/inline_adapter_test.exs new file mode 100644 index 00000000..2864b3e2 --- /dev/null +++ b/test/inline_adapter_test.exs @@ -0,0 +1,25 @@ +defmodule InlineAdapterTest do + use ExUnit.Case, async: true + + defmodule EchoWorker do + def perform(value), do: value + end + + setup do + Exq.Mock.set_mode(:inline) + end + + describe "inline mode" do + test "enqueue should return the correct value" do + assert {:ok, _} = Exq.enqueue(Exq, "low", EchoWorker, [1]) + end + + test "enqueue_at should return the correct value" do + assert {:ok, _} = Exq.enqueue_at(Exq, "low", DateTime.utc_now(), EchoWorker, [1]) + end + + test "enqueue_in should return the correct value" do + assert {:ok, _} = Exq.enqueue_in(Exq, "low", 300, EchoWorker, [1]) + end + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs index bc585b4e..83855abc 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -170,4 +170,6 @@ System.at_exit(fn _status -> TestRedis.stop() end) +Exq.Mock.start_link(mode: :redis) + ExUnit.start(capture_log: true) From 354da82e9f2f2835b2bd0c7cf035789e6342e90a Mon Sep 17 00:00:00 2001 From: Anantha Kumaran Date: Sun, 3 Nov 2019 13:17:00 +0530 Subject: [PATCH 6/8] clear state once the owner process exits --- lib/exq/mock.ex | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/lib/exq/mock.ex b/lib/exq/mock.ex index 73d7529d..0396854e 100644 --- a/lib/exq/mock.ex +++ b/lib/exq/mock.ex @@ -70,7 +70,7 @@ defmodule Exq.Mock do @impl true def handle_call({:enqueue, owner_pid, type, args}, _from, state) do - state = maybe_add_and_monitor_pid(state, owner_pid) + state = maybe_add_and_monitor_pid(state, owner_pid, state.default_mode) runnable = case state.modes[owner_pid] do @@ -97,18 +97,24 @@ defmodule Exq.Mock do end def handle_call({:mode, owner_pid, mode}, _from, state) do - state = put_in(state.modes[owner_pid], mode) + state = maybe_add_and_monitor_pid(state, owner_pid, mode) {:reply, :ok, state} end - defp maybe_add_and_monitor_pid(state, pid) do + @impl true + def handle_info({:DOWN, _, _, pid, _}, state) do + {_, state} = pop_in(state.modes[pid]) + {:noreply, state} + end + + defp maybe_add_and_monitor_pid(state, pid, mode) do case state.modes do %{^pid => _mode} -> state _ -> Process.monitor(pid) - state = put_in(state.modes[pid], state.default_mode) + state = put_in(state.modes[pid], mode) state end end From 4b7f10599e9543f55f397fd4c9086322bff7b508 Mon Sep 17 00:00:00 2001 From: Anantha Kumaran Date: Sun, 3 Nov 2019 13:43:59 +0530 Subject: [PATCH 7/8] add fake mode --- lib/exq/mock.ex | 69 ++++++++++++++----- test/fake_mode_test.exs | 40 +++++++++++ ..._adapter_test.exs => inline_mode_test.exs} | 2 +- 3 files changed, 92 insertions(+), 19 deletions(-) create mode 100644 test/fake_mode_test.exs rename test/{inline_adapter_test.exs => inline_mode_test.exs} (95%) diff --git a/lib/exq/mock.ex b/lib/exq/mock.ex index 0396854e..20d8e9b0 100644 --- a/lib/exq/mock.ex +++ b/lib/exq/mock.ex @@ -1,6 +1,7 @@ defmodule Exq.Mock do alias Exq.Support.Config alias Exq.Adapters.Queue.Redis + alias Exq.Support.Job use GenServer @timeout 30000 @@ -24,10 +25,14 @@ defmodule Exq.Mock do GenServer.start_link(__MODULE__, options, name: __MODULE__) end - def set_mode(mode) when mode in [:redis, :inline] do + def set_mode(mode) when mode in [:redis, :inline, :fake] do GenServer.call(__MODULE__, {:mode, self(), mode}, @timeout) end + def jobs do + GenServer.call(__MODULE__, {:jobs, self()}, @timeout) + end + ### Private @impl true @@ -72,28 +77,30 @@ defmodule Exq.Mock do def handle_call({:enqueue, owner_pid, type, args}, _from, state) do state = maybe_add_and_monitor_pid(state, owner_pid, state.default_mode) - runnable = - case state.modes[owner_pid] do - :redis -> - fn -> apply(Redis, type, args) end + case state.modes[owner_pid] do + :redis -> + runnable = fn -> apply(Redis, type, args) end + {:reply, {:ok, runnable}, state} - :inline -> - fn -> - jid = UUID.uuid4() + :inline -> + runnable = fn -> + job = to_job(args) + apply(job.class, :perform, job.args) + {:ok, job.jid} + end - case args do - [_pid, _queue, worker, args, _options] -> - apply(worker, :perform, args) + {:reply, {:ok, runnable}, state} - [_pid, _queue, _time_or_offset, worker, args, _options] -> - apply(worker, :perform, args) - end + :fake -> + job = to_job(args) + state = update_in(state.jobs[owner_pid], &((&1 || []) ++ [job])) - {:ok, jid} - end - end + runnable = fn -> + {:ok, job.jid} + end - {:reply, {:ok, runnable}, state} + {:reply, {:ok, runnable}, state} + end end def handle_call({:mode, owner_pid, mode}, _from, state) do @@ -101,12 +108,38 @@ defmodule Exq.Mock do {:reply, :ok, state} end + def handle_call({:jobs, owner_pid}, _from, state) do + jobs = state.jobs[owner_pid] || [] + {:reply, jobs, state} + end + @impl true def handle_info({:DOWN, _, _, pid, _}, state) do {_, state} = pop_in(state.modes[pid]) + {_, state} = pop_in(state.jobs[pid]) {:noreply, state} end + defp to_job([_pid, queue, worker, args, _options]) do + %Job{ + jid: UUID.uuid4(), + queue: queue, + class: worker, + args: args, + enqueued_at: DateTime.utc_now() + } + end + + defp to_job([_pid, queue, _time_or_offset, worker, args, _options]) do + %Job{ + jid: UUID.uuid4(), + queue: queue, + class: worker, + args: args, + enqueued_at: DateTime.utc_now() + } + end + defp maybe_add_and_monitor_pid(state, pid, mode) do case state.modes do %{^pid => _mode} -> diff --git a/test/fake_mode_test.exs b/test/fake_mode_test.exs new file mode 100644 index 00000000..06fef32b --- /dev/null +++ b/test/fake_mode_test.exs @@ -0,0 +1,40 @@ +defmodule FakeModeTest do + use ExUnit.Case, async: true + + defmodule BrokenWorker do + def perform(_) do + raise RuntimeError, "Unexpected" + end + end + + setup do + Exq.Mock.set_mode(:fake) + end + + describe "fake mode" do + test "enqueue should " do + assert [] = Exq.Mock.jobs() + assert {:ok, _} = Exq.enqueue(Exq, "low", BrokenWorker, [1]) + assert {:ok, _} = Exq.enqueue_at(Exq, "low", DateTime.utc_now(), BrokenWorker, [2]) + assert {:ok, _} = Exq.enqueue_in(Exq, "low", 300, BrokenWorker, [3]) + + assert [ + %Exq.Support.Job{ + args: [1], + class: FakeModeTest.BrokenWorker, + queue: "low" + }, + %Exq.Support.Job{ + args: [2], + class: FakeModeTest.BrokenWorker, + queue: "low" + }, + %Exq.Support.Job{ + args: [3], + class: FakeModeTest.BrokenWorker, + queue: "low" + } + ] = Exq.Mock.jobs() + end + end +end diff --git a/test/inline_adapter_test.exs b/test/inline_mode_test.exs similarity index 95% rename from test/inline_adapter_test.exs rename to test/inline_mode_test.exs index 2864b3e2..412b5a53 100644 --- a/test/inline_adapter_test.exs +++ b/test/inline_mode_test.exs @@ -1,4 +1,4 @@ -defmodule InlineAdapterTest do +defmodule InlineModeTest do use ExUnit.Case, async: true defmodule EchoWorker do From 83e28afb3725597874a623d95f04bab3a9bd0b94 Mon Sep 17 00:00:00 2001 From: Anantha Kumaran Date: Sat, 16 Nov 2019 19:36:20 +0530 Subject: [PATCH 8/8] Document mock api --- README.md | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/README.md b/README.md index a1bb5433..9f3788eb 100644 --- a/README.md +++ b/README.md @@ -457,6 +457,40 @@ By default, Exq will register itself under the ```Elixir.Exq``` atom. You can c {:ok, exq} = Exq.start_link(name: Exq.Custom) ``` +## Testing + +`Exq.Mock` module which provides few options to test your workers. + +```elixir +# change queue_adapter in config/test.exs +config :exq, + queue_adapter: Exq.Adapters.Queue.Mock + +# start mock server in your test_helper.exs +Exq.Mock.start_link(mode: :redis) +``` + +`Exq.Mock` currently supports three modes. The default mode can provided +on the `Exq.Mock.start_link` call. The mode could be overriden for +each test by calling `Exq.Mock.set_mode(:fake)` + +### redis + +This could be used for integration testing. Doesn't support `async: +true` option. + +### fake + +The jobs get enqueued in a local queue and never get +executed. `Exq.Mock.jobs()` returns all the jobs. Supports `async: +true` option. + +### inline + +The jobs get executed in the same process. Supports `async: true`. + + + ## Donation To donate, send to: