Skip to content
Permalink
Browse files

Merge pull request #837 from FarmBot/bug/amqp-disconnect

Fix AMQP and network disconnects causing limbo state
  • Loading branch information...
ConnorRigby committed Jul 11, 2019
1 parent 810f04e commit 0d0329d2a767e9e8f5d7cc4a63e2f0ded447fb8f
@@ -155,7 +155,8 @@ defmodule FarmbotCore.BotState do

@doc false
def handle_call(:subscribe, {pid, _} = _from, state) do
Process.link(pid)
# TODO Just replace this with Elixir.Registry?
# Process.link(pid)
{:reply, state.tree, %{state | subscribers: Enum.uniq([pid | state.subscribers])}}
end

@@ -61,17 +61,26 @@ defmodule FarmbotCore.Log do
field(:commit, :string)
field(:target, :string)
field(:env, :string)
field(:hash, :binary)
field(:duplicates, :integer, default: 0)
timestamps()
end

@required_fields [:level, :verbosity, :message]
@optional_fields [:meta, :function, :file, :line, :module, :id, :inserted_at, :updated_at]
@optional_fields [:meta, :function, :file, :line, :module, :id, :inserted_at, :updated_at, :duplicates]

def changeset(log, params \\ %{}) do
log
|> new()
|> cast(params, @required_fields ++ @optional_fields)
|> validate_required(@required_fields)
|> calculate_hash()
end

def calculate_hash(changeset) do
message = Ecto.Changeset.get_field(changeset, :message)
hash = :crypto.hash(:sha, message)
Ecto.Changeset.put_change(changeset, :hash, hash)
end

def new(%Log{} = merge) do
@@ -55,13 +55,26 @@ defmodule FarmbotCore.Logger do
end
end

def insert_log!(%Log{} = log) do
def insert_log!(params) do
changeset = Log.changeset(%Log{}, params)
try do
log
|> Log.changeset(%{})
|> Repo.insert!()
hash = Ecto.Changeset.get_field(changeset, :hash)
case Repo.get_by(Log, hash: hash) do
nil ->
Repo.insert!(changeset)
old ->
params =
params
|> Map.put(:inserted_at, DateTime.utc_now)
|> Map.put(:duplicates, old.duplicates + 1)
old
|> Log.changeset(params)
|> Repo.update!()
end
catch
_, _ -> log
kind, err ->
IO.warn("Error inserting log: #{kind} #{inspect(err)}", __STACKTRACE__)
Ecto.Changeset.apply_changes(changeset)
end
end

@@ -88,8 +101,7 @@ defmodule FarmbotCore.Logger do
{fun, ar} -> "#{fun}/#{ar}"
nil -> "no_function"
end

struct!(FarmbotCore.Log,
%{
level: level,
verbosity: verbosity,
message: message,
@@ -98,13 +110,13 @@ defmodule FarmbotCore.Logger do
file: env.file,
line: env.line,
module: env.module
)
}
|> dispatch_log()
end

@doc false
def dispatch_log(%Log{} = log) do
log
def dispatch_log(params) do
params
|> insert_log!()
|> elixir_log()
end
@@ -0,0 +1,10 @@
defmodule FarmbotCore.Logger.Repo.Migrations.AddMessageHashToLogsTable do
use Ecto.Migration

def change do
alter table("logs") do
add(:hash, :binary)
add(:duplicates, :integer)
end
end
end
@@ -17,27 +17,6 @@ defmodule FarmbotCore.BotStateTest do
assert match?({:error, %Ecto.Changeset{valid?: false}}, result)
refute_receive {BotState, %Ecto.Changeset{valid?: true}}
end

test "subscribing links current process" do
# Trap exits so we can assure we can see bot the
# BotState processess and the subscriber process crash.
Process.flag(:trap_exit, true)

# two links, BotState and Subscriber
{:ok, bot_state_pid} = BotState.start_link([], [])

fun = fn ->
_initial_state = BotState.subscribe(bot_state_pid)
exit(:crash)
end

# Spawn the subscriber function
fun_pid = spawn_link(fun)

# Make sure both BotState and Subscriber crashes
assert_receive {:EXIT, ^fun_pid, :crash}
assert_receive {:EXIT, ^bot_state_pid, :crash}
end
end

describe "pins" do
@@ -18,7 +18,7 @@ defmodule FarmbotCore.LoggerTest do
)

# insert the log again
assert FarmbotCore.Logger.insert_log!(log)
assert FarmbotCore.Logger.insert_log!(Map.from_struct(log))

# Make sure the log is available for handling again.
assert Enum.find(
@@ -69,6 +69,12 @@ defmodule FarmbotExt.AMQP.AutoSyncChannel do
FarmbotCore.Logger.error(1, "Disconnected from AutoSync channel: #{inspect(reason)}")
# If a channel was still open, close it.
if state.chan, do: ConnectionWorker.close_channel(state.chan)

try do
EagerLoader.Supervisor.drop_all_cache()
catch
_, _ -> :ok
end
end

def handle_info(:preload, state) do
@@ -176,10 +176,12 @@ defmodule FarmbotExt.AMQP.ConnectionWorker do

defp close_connection(nil), do: :ok

defp close_connection(%{pid: _} = conn) do
if Process.alive?(conn.pid) do
defp close_connection(%{pid: pid}) do
if Process.alive?(pid) do
try do
:ok = AMQP.Connection.close(conn)
Process.exit(pid, :close)
:ok
# :ok = AMQP.Connection.close(conn)
rescue
ex ->
message = Exception.message(ex)
@@ -112,6 +112,11 @@ defmodule FarmbotExt.API.EagerLoader do
{:noreply, %{state | cache: Map.put(state.cache, id, changeset)}}
end

def handle_cast(:drop, state) do
Logger.debug("dropping cache for: #{state.module}")
{:noreply, %{state | cache: %{}}}
end

def handle_call({:get_cache, id}, _, state) do
{result, cache} = Map.pop(state.cache, id)
{:reply, result, %{state | cache: cache}}
@@ -24,6 +24,11 @@ defmodule FarmbotExt.API.EagerLoader.Supervisor do
Supervisor.start_link(__MODULE__, args, name: __MODULE__)
end

def drop_all_cache() do
for {_, pid, _, _} <- Supervisor.which_children(FarmbotExt.API.EagerLoader.Supervisor),
do: GenServer.cast(pid, :drop)
end

def init(_args) do
children = [
{EagerLoader, Device},
@@ -174,6 +174,7 @@ defmodule FarmbotOS.Platform.Target.Network do
state
) do
FarmbotCore.Logger.warn(1, "Interface #{ifname} disconnected from the internet: #{ifstate}")
FarmbotExt.AMQP.ConnectionWorker.close()

if state.network_not_found_timer do
{:noreply, state}

0 comments on commit 0d0329d

Please sign in to comment.
You can’t perform that action at this time.