/
command_listener.ex
51 lines (40 loc) · 1.64 KB
/
command_listener.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
defmodule CommandListener do
use GenServer
require Logger
@handlers_table_name :command_handlers
defstruct [:conn, :queue_name, :chan, :consumer_tag]
def start_link(_) do
GenServer.start_link(__MODULE__, MessageContext.command_queue_name())
end
@impl true
def init(queue) do
:ok = ConnectionsHolder.get_connection_async(__MODULE__)
{:ok, %__MODULE__{queue_name: queue}}
end
@impl true
def handle_info({:connected, conn}, state = %{queue_name: queue_name}) do
{:ok, chan} = AMQP.Channel.open(conn)
{:ok, consumer_tag} = AMQP.Basic.consume(chan, queue_name)
{:noreply, %{state | chan: chan, consumer_tag: consumer_tag, conn: conn}}
end
def handle_info({:basic_consume_ok, %{consumer_tag: consumer_tag}}, state) do
Logger.info("Command listener registered with consumer tag: #{inspect(consumer_tag)}")
{:noreply, %{state | consumer_tag: consumer_tag}}
end
def handle_info({:basic_cancel, %{consumer_tag: consumer_tag}}, state) do
Logger.error("Command listener consumer stoped by the broker: #{consumer_tag}")
{:stop, :normal, state}
end
def handle_info({:basic_cancel_ok, %{consumer_tag: _}}, state) do
Logger.warn("Command listener consumer cancelled!")
{:noreply, state}
end
def handle_info({:basic_deliver, payload, props = %{delivery_tag: _tag}}, state) do
consume(props, payload, state)
{:noreply, state}
end
def consume(props = %{delivery_tag: _, redelivered: _}, payload, _state = %{chan: chan}) do
message_to_handle = MessageToHandle.new(props, payload, chan, @handlers_table_name)
spawn_link(CommandExecutor, :handle_message, [message_to_handle])
end
end