Skip to content

Commit

Permalink
Merge 4c1a8de into 79e3b86
Browse files Browse the repository at this point in the history
  • Loading branch information
ananthakumaran committed Oct 3, 2019
2 parents 79e3b86 + 4c1a8de commit fd71c40
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 94 deletions.
14 changes: 14 additions & 0 deletions bench.exs
@@ -0,0 +1,14 @@
defmodule BenchmarkWorker do
def perform() do
end
end

{:ok, _} = Application.ensure_all_started(:exq)
Logger.configure(level: :warn)

Benchee.run(
%{
"enqueue" => fn -> {:ok, _} = Exq.enqueue(Exq, "default", BenchmarkWorker, []) end
},
parallel: 100
)
46 changes: 7 additions & 39 deletions lib/exq/enqueue_api.ex
Expand Up @@ -9,6 +9,7 @@ defmodule Exq.Enqueuer.EnqueueApi do
defmacro __using__(_) do
quote location: :keep do
alias Exq.Support.Config
alias Exq.Redis.JobQueue

@default_options []
@doc """
Expand All @@ -28,20 +29,9 @@ defmodule Exq.Enqueuer.EnqueueApi do
def enqueue(pid, queue, worker, args),
do: enqueue(pid, queue, worker, args, @default_options)

def enqueue(pid, from, queue, worker, args) when is_pid(from) do
enqueue(pid, from, queue, worker, args, @default_options)
end

def enqueue(pid, queue, worker, args, options) do
GenServer.call(
pid,
{:enqueue, queue, worker, args, options},
Config.get(:genserver_timeout)
)
end

def enqueue(pid, from, queue, worker, args, options) do
GenServer.cast(pid, {:enqueue, from, queue, worker, args, options})
{redis, namespace} = GenServer.call(pid, :redis, Config.get(:genserver_timeout))
JobQueue.enqueue(redis, namespace, queue, worker, args, options)
end

@doc """
Expand All @@ -59,20 +49,9 @@ defmodule Exq.Enqueuer.EnqueueApi do
def enqueue_at(pid, queue, time, worker, args),
do: enqueue_at(pid, queue, time, worker, args, @default_options)

def enqueue_at(pid, from, queue, time, worker, args) when is_pid(from) do
enqueue_at(pid, from, queue, time, worker, args, @default_options)
end

def enqueue_at(pid, queue, time, worker, args, options) do
GenServer.call(
pid,
{:enqueue_at, queue, time, worker, args, options},
Config.get(:genserver_timeout)
)
end

def enqueue_at(pid, from, queue, time, worker, args, options) do
GenServer.cast(pid, {:enqueue_at, from, queue, time, worker, args, options})
{redis, namespace} = GenServer.call(pid, :redis, Config.get(:genserver_timeout))
JobQueue.enqueue_at(redis, namespace, queue, time, worker, args, options)
end

@doc """
Expand All @@ -90,20 +69,9 @@ defmodule Exq.Enqueuer.EnqueueApi do
def enqueue_in(pid, queue, offset, worker, args),
do: enqueue_in(pid, queue, offset, worker, args, @default_options)

def enqueue_in(pid, from, queue, offset, worker, args) when is_pid(from) do
enqueue_in(pid, from, queue, offset, worker, args, @default_options)
end

def enqueue_in(pid, queue, offset, worker, args, options) do
GenServer.call(
pid,
{:enqueue_in, queue, offset, worker, args, options},
Config.get(:genserver_timeout)
)
end

def enqueue_in(pid, from, queue, offset, worker, args, options) do
GenServer.cast(pid, {:enqueue_in, from, queue, offset, worker, args, options})
{redis, namespace} = GenServer.call(pid, :redis, Config.get(:genserver_timeout))
JobQueue.enqueue_in(redis, namespace, queue, offset, worker, args, options)
end
end
end
Expand Down
42 changes: 2 additions & 40 deletions lib/exq/enqueuer/server.ex
Expand Up @@ -16,7 +16,6 @@ defmodule Exq.Enqueuer.Server do
require Logger

alias Exq.Support.Config
alias Exq.Redis.JobQueue
use GenServer

defmodule State do
Expand All @@ -35,45 +34,8 @@ defmodule Exq.Enqueuer.Server do
{:ok, %State{redis: opts[:redis], namespace: opts[:namespace]}}
end

def handle_cast({:enqueue, from, queue, worker, args, options}, state) do
response = JobQueue.enqueue(state.redis, state.namespace, queue, worker, args, options)
GenServer.reply(from, response)
{:noreply, state}
end

def handle_cast({:enqueue_at, from, queue, time, worker, args, options}, state) do
response =
JobQueue.enqueue_at(state.redis, state.namespace, queue, time, worker, args, options)

GenServer.reply(from, response)
{:noreply, state}
end

def handle_cast({:enqueue_in, from, queue, offset, worker, args, options}, state) do
response =
JobQueue.enqueue_in(state.redis, state.namespace, queue, offset, worker, args, options)

GenServer.reply(from, response)
{:noreply, state}
end

def handle_call({:enqueue, queue, worker, args, options}, _from, state) do
response = JobQueue.enqueue(state.redis, state.namespace, queue, worker, args, options)
{:reply, response, state}
end

def handle_call({:enqueue_at, queue, time, worker, args, options}, _from, state) do
response =
JobQueue.enqueue_at(state.redis, state.namespace, queue, time, worker, args, options)

{:reply, response, state}
end

def handle_call({:enqueue_in, queue, offset, worker, args, options}, _from, state) do
response =
JobQueue.enqueue_in(state.redis, state.namespace, queue, offset, worker, args, options)

{:reply, response, state}
def handle_call(:redis, _from, state) do
{:reply, {state.redis, state.namespace}, state}
end

def terminate(_reason, _state) do
Expand Down
16 changes: 2 additions & 14 deletions lib/exq/manager/server.ex
Expand Up @@ -110,7 +110,6 @@ defmodule Exq.Manager.Server do

require Logger
use GenServer
alias Exq.Enqueuer
alias Exq.Support.Config
alias Exq.Redis.JobQueue

Expand Down Expand Up @@ -175,19 +174,8 @@ defmodule Exq.Manager.Server do
{:ok, state, 0}
end

def handle_call({:enqueue, queue, worker, args, options}, from, state) do
Enqueuer.enqueue(state.enqueuer, from, queue, worker, args, options)
{:noreply, state, 10}
end

def handle_call({:enqueue_at, queue, time, worker, args, options}, from, state) do
Enqueuer.enqueue_at(state.enqueuer, from, queue, time, worker, args, options)
{:noreply, state, 10}
end

def handle_call({:enqueue_in, queue, offset, worker, args, options}, from, state) do
Enqueuer.enqueue_in(state.enqueuer, from, queue, offset, worker, args, options)
{:noreply, state, 10}
def handle_call(:redis, _from, state) do
{:reply, {state.redis, state.namespace}, state, 10}
end

def handle_call(:subscriptions, _from, state) do
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Expand Up @@ -48,6 +48,7 @@ defmodule Exq.Mixfile do
# docs
{:ex_doc, "~> 0.19", only: :dev},
{:earmark, "~> 1.0", only: :dev},
{:benchee, "~> 1.0", only: :dev},
{:ranch, "~> 1.6", only: :test, override: true}
]
end
Expand Down
4 changes: 3 additions & 1 deletion mix.lock
@@ -1,5 +1,7 @@
%{
"benchee": {:hex, :benchee, "1.0.1", "66b211f9bfd84bd97e6d1beaddf8fc2312aaabe192f776e8931cb0c16f53a521", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}], "hexpm"},
"certifi": {:hex, :certifi, "0.7.0", "861a57f3808f7eb0c2d1802afeaae0fa5de813b0df0979153cbafcd853ababaf", [:rebar3], []},
"deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm"},
"earmark": {:hex, :earmark, "1.3.1", "73812f447f7a42358d3ba79283cfa3075a7580a3a2ed457616d6517ac3738cb9", [:mix], []},
"elixir_uuid": {:hex, :elixir_uuid, "1.2.0", "ff26e938f95830b1db152cb6e594d711c10c02c6391236900ddd070a6b01271d", [:mix], []},
"ex_doc": {:hex, :ex_doc, "0.19.3", "3c7b0f02851f5fc13b040e8e925051452e41248f685e40250d7e40b07b9f8c10", [:mix], [{:earmark, "~> 1.2", [hex: :earmark, optional: false]}, {:makeup_elixir, "~> 0.10", [hex: :makeup_elixir, optional: false]}]},
Expand All @@ -17,6 +19,6 @@
"nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], []},
"poison": {:hex, :poison, "2.1.0", "f583218ced822675e484648fa26c933d621373f01c6c76bd00005d7bd4b82e27", [:mix], []},
"ranch": {:hex, :ranch, "1.7.0", "9583f47160ca62af7f8d5db11454068eaa32b56eeadf984d4f46e61a076df5f2", [:rebar3], []},
"redix": {:hex, :redix, "0.9.0", "c631c921354587e054bf1e2a6b7d0120d617352fc42b624b9ca79ea484d0326c", [:mix], []},
"redix": {:hex, :redix, "0.9.0", "c631c921354587e054bf1e2a6b7d0120d617352fc42b624b9ca79ea484d0326c", [:mix], [], "hexpm"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], []},
}

0 comments on commit fd71c40

Please sign in to comment.