Skip to content
Permalink
Browse files

Fix CeleryScript channel reconnects **even more**

  • Loading branch information...
ConnorRigby committed Sep 10, 2019
1 parent 72777b0 commit 11eb29e08243875e12f866074a014674d0394f3f
@@ -26,7 +26,7 @@ defmodule FarmbotExt.AMQP.CeleryScriptChannel do
def init(args) do
jwt = Keyword.fetch!(args, :jwt)
Process.flag(:sensitive, true)
send(self(), :timeout)
send(self(), :connect_amqp)
{:ok, %State{conn: nil, chan: nil, jwt: jwt, rpc_requests: %{}}}
end

@@ -36,9 +36,31 @@ defmodule FarmbotExt.AMQP.CeleryScriptChannel do
if state.chan, do: AMQP.Channel.close(state.chan)
end

def handle_info(:timeout, state) do
status = ConnectionWorker.maybe_connect_celeryscript(state.jwt.bot)
compute_reply_from_amqp_state(state, status)
def handle_info(:connect_amqp, state) do
bot = state.jwt.bot
queue_name = "#{bot}_from_clients"
route = "bot.#{bot}.from_clients"

with %{} = conn <- ConnectionWorker.connection(),
{:ok, %{pid: channel_pid} = chan} <- Channel.open(conn),
Process.link(channel_pid),
:ok <- Basic.qos(chan, global: true),
{:ok, _} <- Queue.declare(chan, queue_name, auto_delete: true),
{:ok, _} <- Queue.purge(chan, queue_name),
:ok <- Queue.bind(chan, queue_name, @exchange, routing_key: route),
{:ok, _tag} <- Basic.consume(chan, queue_name, self(), no_ack: true) do
FarmbotCore.Logger.debug(3, "connected to CeleryScript channel")
{:noreply, %{state | conn: conn, chan: chan}}
else
nil ->
Process.send_after(self(), :connect_amqp, 5000)
{:noreply, %{state | conn: nil, chan: nil}}

err ->
FarmbotCore.Logger.error(1, "Failed to connect to CeleryScript channel: #{inspect(err)}")
Process.send_after(self(), :connect_amqp, 2000)
{:noreply, %{state | conn: nil, chan: nil}}
end
end

# Confirmation sent by the broker after registering this process as a consumer
@@ -133,23 +155,4 @@ defmodule FarmbotExt.AMQP.CeleryScriptChannel do
{:noreply, state}
end
end

defp compute_reply_from_amqp_state(state, %{conn: conn, chan: chan}) do
{:noreply, %{state | conn: conn, chan: chan}}
end

defp compute_reply_from_amqp_state(state, error) do
# Run error warning if error not nil
if error,
do:
FarmbotCore.Logger.error(
1,
"Failed to connect to CeleryScript channel: #{inspect(error)}"
)

# Try to reconnect every 5 seconds. This should have some randomness
# sprinkled onto it in the case of mass disconnects etc.
Process.send_after(self(), :timeout, 5000)
{:noreply, %{state | conn: nil, chan: nil}}
end
end
@@ -45,17 +45,6 @@ defmodule FarmbotExt.AMQP.ConnectionWorker do
maybe_connect(chan_name, route, auto_delete, purge?)
end

@doc "Takes the 'bot' claim seen in the JWT and connects to the RPC server."
@callback maybe_connect_celeryscript(String.t()) :: map()
def maybe_connect_celeryscript(jwt_dot_bot) do
auto_delete = true
chan_name = jwt_dot_bot <> "_from_clients"
purge? = true
route = "bot.#{jwt_dot_bot}.from_clients"

maybe_connect(chan_name, route, auto_delete, purge?)
end

defp maybe_connect(chan_name, route, auto_delete, purge?) do
with %{} = conn <- FarmbotExt.AMQP.ConnectionWorker.connection(),
{:ok, chan} <- Channel.open(conn),

This file was deleted.

0 comments on commit 11eb29e

Please sign in to comment.
You can’t perform that action at this time.