From a1be48ca50bf28815fbc6d8d5fc4f15754350481 Mon Sep 17 00:00:00 2001 From: Antonin Date: Tue, 3 Apr 2018 14:50:46 +0200 Subject: [PATCH] Simplify heartbeat & add timout to call --- lib/exq/manager/heartbeat_server.ex | 48 ++++++++++------------------- lib/exq/manager/server.ex | 13 ++++---- 2 files changed, 23 insertions(+), 38 deletions(-) diff --git a/lib/exq/manager/heartbeat_server.ex b/lib/exq/manager/heartbeat_server.ex index dd15a361..41f2ecfd 100644 --- a/lib/exq/manager/heartbeat_server.ex +++ b/lib/exq/manager/heartbeat_server.ex @@ -1,52 +1,38 @@ defmodule Exq.Heartbeat.Server do use GenServer alias Exq.Redis.Connection - alias Exq.Support.Config alias Exq.Redis.JobStat - defmodule State do - defstruct name: nil, node_id: nil, namespace: nil, started_at: nil, pid: nil, queues: nil, poll_timeout: nil, work_table: nil, redis: nil - end - def start_link(opts) do GenServer.start_link(__MODULE__, opts) end def init(opts) do - state = %State{ - name: Exq.Manager.Server.server_name(opts[:name]), - redis: opts[:redis], - node_id: Config.node_identifier.node_id(), - namespace: opts[:namespace], - queues: opts[:queues], - poll_timeout: opts[:poll_timeout] - } - schedule_work(state, true) - {:ok, state} + schedule_new_work(Exq.Manager.Server.server_name(opts[:name])) + {:ok} end - def handle_cast({:heartbeat, master_state, status}, state) do - schedule_work(state) - master_data = Map.from_struct(master_state) - current_state = %{struct(State, master_data) | name: state.name} - init_data = if status, do: [["DEL", "#{current_state.name}:workers"]], else: [] + def heartbeat(name, status) do + master_state = GenServer.call(name, :get_state) + init_data = if status, do: [["DEL", "#{master_state.name}:workers"]], else: [] data = init_data ++ JobStat.status_process_commands( - current_state.namespace, - current_state.node_id, - current_state.started_at, - current_state.pid, - current_state.queues, - current_state.work_table, - current_state.poll_timeout + master_state.namespace, + master_state.node_id, + master_state.started_at, + master_state.pid, + master_state.queues, + master_state.work_table, + master_state.poll_timeout ) Connection.qp!( - current_state.redis, + master_state.redis, data ) - {:noreply, current_state} end - defp schedule_work(state, status \\ false) do - Process.send_after(state.name, {:get_state, self(), status}, 1000) + defp schedule_new_work(name, status \\ false) do + :timer.sleep(1000) + heartbeat(name, status) + schedule_new_work(name) end end diff --git a/lib/exq/manager/server.ex b/lib/exq/manager/server.ex index b89eabed..d7c262f0 100644 --- a/lib/exq/manager/server.ex +++ b/lib/exq/manager/server.ex @@ -144,7 +144,8 @@ defmodule Exq.Manager.Server do # Setup queues work_table = setup_queues(opts) - state = %State{work_table: work_table, + state = %State{ + work_table: work_table, redis: opts[:redis], stats: opts[:stats], workers_sup: opts[:workers_sup], @@ -205,6 +206,10 @@ defmodule Exq.Manager.Server do {:reply, :ok, updated_state, 0} end + def handle_call(:get_state, _from, state) do + {:reply, state, state, state.poll_timeout} + end + def handle_cast({:re_enqueue_backup, queue}, state) do rescue_timeout(fn -> JobQueue.re_enqueue_backup(state.redis, state.namespace, state.node_id, queue) @@ -222,11 +227,6 @@ defmodule Exq.Manager.Server do {:noreply, updated_state, timeout} end - def handle_info({:get_state, pid, status}, state) do - GenServer.cast(pid, {:heartbeat, state, status}) - {:noreply, state} - end - def handle_info(_info, state) do {:noreply, state, state.poll_timeout} end @@ -246,7 +246,6 @@ defmodule Exq.Manager.Server do def dequeue_and_dispatch(state, queues) do rescue_timeout({state, state.poll_timeout}, fn -> jobs = Exq.Redis.JobQueue.dequeue(state.redis, state.namespace, state.node_id, queues) - job_results = jobs |> Enum.map(fn(potential_job) -> dispatch_job(state, potential_job) end) cond do