Skip to content

Commit

Permalink
Start Heartbeat server from Mode modul
Browse files Browse the repository at this point in the history
  • Loading branch information
TondaHack committed Feb 16, 2018
1 parent 82f5082 commit ea33b33
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 27 deletions.
39 changes: 18 additions & 21 deletions lib/exq/manager/heartbeat_server.ex
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
defmodule Exq.Heartbeat.Server do
use GenServer
alias Exq.Redis.JobQueue
alias Exq.Redis.Connection
alias Exq.Support.Config
alias Exq.Support.Time
alias Exq.Redis.JobStat

defmodule State do
Expand All @@ -23,33 +21,32 @@ defmodule Exq.Heartbeat.Server do
queues: opts[:queues],
poll_timeout: opts[:poll_timeout]
}
schedule_work(state)
schedule_work(state, true)
{:ok, state}
end

def handle_cast({:heartbeat, master_state}, state) do
def handle_cast({:heartbeat, master_state, status}, state) do
schedule_work(state)
current_state = struct(State, Map.from_struct(master_state))
master_data = Map.from_struct(master_state)
current_state = %{struct(State, master_data) | name: state.name}
init_data = if status, do: [["DEL", "#{current_state.name}:workers"]], else: []
data = init_data ++ JobStat.get_redis_commands(
current_state.namespace,
current_state.node_id,
current_state.started_at,
current_state.pid,
current_state.queues,
current_state.work_table,
current_state.poll_timeout
)
Connection.qp!(
current_state.redis,
JobStat.get_redis_commands(
current_state.namespace,
current_state.node_id,
current_state.started_at,
current_state.pid,
current_state.queues,
current_state.work_table,
current_state.poll_timeout
)
)
data
)
{:noreply, current_state}
end

defp schedule_work(state) do
Process.send_after(state.name, {:get_state, self()}, 1000)
end

defp redis_worker_name(state) do
JobQueue.full_key(state.namespace, "#{state.node_id}:elixir")
defp schedule_work(state, status \\ false) do
Process.send_after(state.name, {:get_state, self(), status}, 1000)
end
end
10 changes: 5 additions & 5 deletions lib/exq/manager/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,6 @@ defmodule Exq.Manager.Server do
{:ok, state, 0}
end

def handle_info({:get_state, pid}, state) do
GenServer.cast(pid, {:heartbeat, state})
{:noreply, state}
end

def handle_call({:enqueue, queue, worker, args, options}, from, state) do
Enqueuer.enqueue(state.enqueuer, from, queue, worker, args, options)
{:noreply, state, 10}
Expand Down Expand Up @@ -227,6 +222,11 @@ defmodule Exq.Manager.Server do
{:noreply, updated_state, timeout}
end

def handle_info({:get_state, pid, status}, state) do
GenServer.cast(pid, {:heartbeat, state, status})
{:noreply, state}
end

def handle_info(_info, state) do
{:noreply, state, state.poll_timeout}
end
Expand Down
1 change: 0 additions & 1 deletion lib/exq/redis/job_stat.ex
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ defmodule Exq.Redis.JobStat do
def get_redis_commands(namespace, node_id, started_at, master_pid, queues, work_table, poll_timeout) do
name = redis_worker_name(namespace, node_id)
[
["DEL", "#{name}:workers"],
["SADD", JobQueue.full_key(namespace, "processes"), name],
["HSET", name, "quiet", "false"],
["HSET", name, "info", Poison.encode!(%{ hostname: node_id, started_at: started_at, pid: "#{:erlang.pid_to_list(master_pid)}", concurrency: cocurency_count(queues, work_table), queues: queues})],
Expand Down

0 comments on commit ea33b33

Please sign in to comment.