Skip to content

Commit

Permalink
Simplify heartbeat & add timout to call
Browse files Browse the repository at this point in the history
  • Loading branch information
TondaHack committed Apr 3, 2018
1 parent f32d0b5 commit a1be48c
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 38 deletions.
48 changes: 17 additions & 31 deletions lib/exq/manager/heartbeat_server.ex
Original file line number Diff line number Diff line change
@@ -1,52 +1,38 @@
defmodule Exq.Heartbeat.Server do
use GenServer
alias Exq.Redis.Connection
alias Exq.Support.Config
alias Exq.Redis.JobStat

defmodule State do
defstruct name: nil, node_id: nil, namespace: nil, started_at: nil, pid: nil, queues: nil, poll_timeout: nil, work_table: nil, redis: nil
end

def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
end

def init(opts) do
state = %State{
name: Exq.Manager.Server.server_name(opts[:name]),
redis: opts[:redis],
node_id: Config.node_identifier.node_id(),
namespace: opts[:namespace],
queues: opts[:queues],
poll_timeout: opts[:poll_timeout]
}
schedule_work(state, true)
{:ok, state}
schedule_new_work(Exq.Manager.Server.server_name(opts[:name]))
{:ok}
end

def handle_cast({:heartbeat, master_state, status}, state) do
schedule_work(state)
master_data = Map.from_struct(master_state)
current_state = %{struct(State, master_data) | name: state.name}
init_data = if status, do: [["DEL", "#{current_state.name}:workers"]], else: []
def heartbeat(name, status) do
master_state = GenServer.call(name, :get_state)
init_data = if status, do: [["DEL", "#{master_state.name}:workers"]], else: []
data = init_data ++ JobStat.status_process_commands(
current_state.namespace,
current_state.node_id,
current_state.started_at,
current_state.pid,
current_state.queues,
current_state.work_table,
current_state.poll_timeout
master_state.namespace,
master_state.node_id,
master_state.started_at,
master_state.pid,
master_state.queues,
master_state.work_table,
master_state.poll_timeout
)
Connection.qp!(
current_state.redis,
master_state.redis,
data
)
{:noreply, current_state}
end

defp schedule_work(state, status \\ false) do
Process.send_after(state.name, {:get_state, self(), status}, 1000)
defp schedule_new_work(name, status \\ false) do
:timer.sleep(1000)
heartbeat(name, status)
schedule_new_work(name)
end
end
13 changes: 6 additions & 7 deletions lib/exq/manager/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ defmodule Exq.Manager.Server do
# Setup queues
work_table = setup_queues(opts)

state = %State{work_table: work_table,
state = %State{
work_table: work_table,
redis: opts[:redis],
stats: opts[:stats],
workers_sup: opts[:workers_sup],
Expand Down Expand Up @@ -205,6 +206,10 @@ defmodule Exq.Manager.Server do
{:reply, :ok, updated_state, 0}
end

def handle_call(:get_state, _from, state) do
{:reply, state, state, state.poll_timeout}
end

def handle_cast({:re_enqueue_backup, queue}, state) do
rescue_timeout(fn ->
JobQueue.re_enqueue_backup(state.redis, state.namespace, state.node_id, queue)
Expand All @@ -222,11 +227,6 @@ defmodule Exq.Manager.Server do
{:noreply, updated_state, timeout}
end

def handle_info({:get_state, pid, status}, state) do
GenServer.cast(pid, {:heartbeat, state, status})
{:noreply, state}
end

def handle_info(_info, state) do
{:noreply, state, state.poll_timeout}
end
Expand All @@ -246,7 +246,6 @@ defmodule Exq.Manager.Server do
def dequeue_and_dispatch(state, queues) do
rescue_timeout({state, state.poll_timeout}, fn ->
jobs = Exq.Redis.JobQueue.dequeue(state.redis, state.namespace, state.node_id, queues)

job_results = jobs |> Enum.map(fn(potential_job) -> dispatch_job(state, potential_job) end)

cond do
Expand Down

0 comments on commit a1be48c

Please sign in to comment.