Skip to content
Permalink
Browse files

Add telemetry to the rest of the app

More things will be added in the future.
Metrics are collected from bot state because
the amount of messages sent by every bot would
make datadog charge us a lot of money
  • Loading branch information...
ConnorRigby committed Oct 24, 2019
1 parent 49c60bd commit ec56ebaf744ce5b9ecbc28d407aab2456aff90ce
@@ -1,5 +1,6 @@
defmodule FarmbotCore.BotState.SchedulerUsageReporter do
alias FarmbotCore.BotState
require FarmbotTelemetry
use GenServer
@default_timeout_ms 5000

@@ -14,6 +14,7 @@ defmodule FarmbotExt.AMQP.AutoSyncChannel do

require Logger
require FarmbotCore.Logger
require FarmbotTelemetry

# The API dispatches messages for other resources, but these
# are the only ones that Farmbot needs to sync.
@@ -96,6 +97,7 @@ defmodule FarmbotExt.AMQP.AutoSyncChannel do
BotState.set_sync_status("sync_error")
_ = Leds.green(:slow_blink)
FarmbotCore.Logger.error(1, "Error preloading. #{inspect(reason)}")
FarmbotTelemetry.event(:asset_sync, :preload_error, nil, error: inspect(reason))
Process.send_after(self(), :preload, 5000)
{:noreply, state}
end
@@ -8,6 +8,7 @@ defmodule FarmbotExt.AMQP.BotStateChannel do
alias AMQP.Channel

require FarmbotCore.Logger
require FarmbotTelemetry
alias FarmbotCore.JSON
alias FarmbotExt.AMQP.ConnectionWorker

@@ -54,12 +55,14 @@ defmodule FarmbotExt.AMQP.BotStateChannel do
with %{} = conn <- ConnectionWorker.connection(),
{:ok, chan} <- Channel.open(conn),
:ok <- Basic.qos(chan, global: true) do
FarmbotTelemetry.event(:amqp, :channel_open)
{:noreply, %{state | conn: conn, chan: chan}, {:continue, :dispatch}}
else
nil ->
{:noreply, %{state | conn: nil, chan: nil}, 5000}

err ->
FarmbotTelemetry.event(:amqp, :channel_open_error, nil, error: inspect(err))
FarmbotCore.Logger.error(1, "Failed to connect to BotState channel: #{inspect(err)}")
{:noreply, %{state | conn: nil, chan: nil}, 1000}
end
@@ -8,6 +8,7 @@ defmodule FarmbotExt.AMQP.CeleryScriptChannel do

alias FarmbotCore.JSON
require FarmbotCore.Logger
require FarmbotTelemetry
require Logger

alias FarmbotCeleryScript.{AST, StepRunner}
@@ -50,6 +51,8 @@ defmodule FarmbotExt.AMQP.CeleryScriptChannel do
:ok <- Queue.bind(chan, queue_name, @exchange, routing_key: route),
{:ok, _tag} <- Basic.consume(chan, queue_name, self(), no_ack: true) do
FarmbotCore.Logger.debug(3, "connected to CeleryScript channel")
FarmbotTelemetry.event(:amqp, :channel_open)
FarmbotTelemetry.event(:amqp, :queue_bind, nil, queue_name: queue_name, routing_key: route)
{:noreply, %{state | conn: conn, chan: chan}}
else
nil ->
@@ -58,6 +61,7 @@ defmodule FarmbotExt.AMQP.CeleryScriptChannel do

err ->
FarmbotCore.Logger.error(1, "Failed to connect to CeleryScript channel: #{inspect(err)}")
FarmbotTelemetry.event(:amqp, :channel_open_error, nil, error: inspect(err))
Process.send_after(self(), :connect_amqp, 2000)
{:noreply, %{state | conn: nil, chan: nil}}
end
@@ -6,6 +6,7 @@ defmodule FarmbotExt.AMQP.ConnectionWorker do
use GenServer
require Logger
require FarmbotCore.Logger
require FarmbotTelemetry
alias AMQP.{Basic, Channel, Queue}

alias FarmbotExt.{JWT, AMQP.ConnectionWorker}
@@ -55,10 +56,17 @@ defmodule FarmbotExt.AMQP.ConnectionWorker do
{:ok, _} <- Basic.consume(chan, chan_name, self(), no_ack: true) do
Process.link(conn.pid)
Process.link(chan.pid)
FarmbotTelemetry.event(:amqp, :channel_open)
FarmbotTelemetry.event(:amqp, :queue_bind, nil, queue_name: chan_name, routing_key: route)

%{conn: conn, chan: chan}
else
nil -> %{conn: nil, chan: nil}
error -> error
nil ->
%{conn: nil, chan: nil}

error ->
FarmbotTelemetry.event(:amqp, :channel_open_error, nil, error: inspect(error))
error
end
end

@@ -107,11 +115,13 @@ defmodule FarmbotExt.AMQP.ConnectionWorker do

case open_connection(token, email, jwt.bot, jwt.mqtt, jwt.vhost) do
{:ok, conn} ->
FarmbotTelemetry.event(:amqp, :connection_open)
Process.monitor(conn.pid)
{:noreply, %{state | conn: conn}}

err ->
Logger.error("Error Opening AMQP connection: #{inspect(err)}")
FarmbotTelemetry.event(:amqp, :connection_open_error, nil, error: inspect(err))
Process.send_after(self(), :timeout, 5000)
{:noreply, %{state | conn: nil}}
end
@@ -120,6 +130,7 @@ defmodule FarmbotExt.AMQP.ConnectionWorker do
def handle_info({:DOWN, _ref, :process, _pid, reason}, state) do
FarmbotCore.Logger.error(2, "AMQP Connection exit")
_ = close_connection(state.conn)
FarmbotTelemetry.event(:amqp, :connection_close)
{:stop, reason, state}
end

@@ -131,6 +142,7 @@ defmodule FarmbotExt.AMQP.ConnectionWorker do
def handle_call(:close, _from, %{conn: _conn} = state) do
FarmbotCore.Logger.error(2, "AMQP Connection closing")
reply = close_connection(state.conn)
FarmbotTelemetry.event(:amqp, :connection_close)
{:stop, :close, reply, %{state | conn: nil}}
end

@@ -9,6 +9,7 @@ defmodule FarmbotExt.AMQP.LogChannel do

alias FarmbotCore.{BotState, JSON}
require FarmbotCore.Logger
require FarmbotTelemetry

alias FarmbotExt.AMQP.ConnectionWorker
require Logger
@@ -40,6 +41,7 @@ defmodule FarmbotExt.AMQP.LogChannel do
with %{} = conn <- ConnectionWorker.connection(),
{:ok, chan} <- Channel.open(conn),
:ok <- Basic.qos(chan, global: true) do
FarmbotTelemetry.event(:amqp, :channel_open)
initial_bot_state = BotState.subscribe()
{:noreply, %{state | conn: conn, chan: chan, state_cache: initial_bot_state}, 0}
else
@@ -48,6 +50,7 @@ defmodule FarmbotExt.AMQP.LogChannel do

err ->
FarmbotCore.Logger.error(1, "Failed to connect to Log channel: #{inspect(err)}")
FarmbotTelemetry.event(:amqp, :channel_open_error, nil, error: inspect(err))
{:noreply, %{state | conn: nil, chan: nil, state_cache: nil}, 1000}
end
end
@@ -12,6 +12,7 @@ defmodule FarmbotExt.AMQP.PingPongChannel do

require Logger
require FarmbotCore.Logger
require FarmbotTelemetry
alias FarmbotCore.Leds

@exchange "amq.topic"
@@ -66,6 +67,10 @@ defmodule FarmbotExt.AMQP.PingPongChannel do
{:ok, _} <- Queue.purge(chan, ping),
:ok <- Queue.bind(chan, ping, @exchange, routing_key: route <> ".#"),
{:ok, _tag} <- Basic.consume(chan, ping, self(), no_ack: true) do
FarmbotTelemetry.event(:amqp, :channel_open)

FarmbotTelemetry.event(:amqp, :queue_bind, nil, queue_name: ping, routing_key: route <> ".#")

FarmbotCore.Logger.debug(3, "connected to PingPong channel")
_ = Leds.blue(:solid)
{:noreply, %{state | conn: conn, chan: chan}}
@@ -76,6 +81,7 @@ defmodule FarmbotExt.AMQP.PingPongChannel do

err ->
FarmbotCore.Logger.error(1, "Failed to connect to PingPong channel: #{inspect(err)}")
FarmbotTelemetry.event(:amqp, :channel_open_error, nil, error: inspect(err))
Process.send_after(self(), :connect_amqp, 2000)
{:noreply, %{state | conn: nil, chan: nil}}
end
@@ -4,12 +4,16 @@ defmodule FarmbotExt.AMQP.TelemetryChannel do
use GenServer
use AMQP

alias FarmbotCore.{BotState, BotStateNG}
alias FarmbotExt.AMQP.ConnectionWorker
require FarmbotCore.Logger
require FarmbotTelemetry

@exchange "amq.topic"
@dispatch_metrics_timeout 300_000
@consume_telemetry_timeout 1000

defstruct [:conn, :chan, :jwt]
defstruct [:conn, :chan, :jwt, :cache]
alias __MODULE__, as: State

@doc false
@@ -21,11 +25,13 @@ defmodule FarmbotExt.AMQP.TelemetryChannel do
Process.flag(:sensitive, true)
jwt = Keyword.fetch!(args, :jwt)
send(self(), :connect_amqp)
cache = BotState.subscribe()

state = %State{
conn: nil,
chan: nil,
jwt: jwt
jwt: jwt,
cache: cache
}

{:ok, state}
@@ -47,8 +53,10 @@ defmodule FarmbotExt.AMQP.TelemetryChannel do
:ok <- Basic.qos(chan, global: true),
{:ok, _} <- Queue.declare(chan, telemetry, auto_delete: true),
{:ok, _} <- Queue.purge(chan, telemetry) do
FarmbotTelemetry.event(:amqp, :channel_open)
FarmbotCore.Logger.debug(3, "connected to Telemetry channel")
send(self(), :consume_telemetry)
send(self(), :dispatch_metrics)
{:noreply, %{state | conn: conn, chan: chan}}
else
nil ->
@@ -57,11 +65,39 @@ defmodule FarmbotExt.AMQP.TelemetryChannel do

err ->
FarmbotCore.Logger.error(1, "Failed to connect to Telemetry channel: #{inspect(err)}")
FarmbotTelemetry.event(:amqp, :channel_open_error, nil, error: inspect(err))
Process.send_after(self(), :connect_amqp, 2000)
{:noreply, %{state | conn: nil, chan: nil}}
end
end

def handle_info({BotState, change}, state) do
cache = Ecto.Changeset.apply_changes(change)
{:noreply, %{state | cache: cache}}
end

def handle_info(:dispatch_metrics, state) do
metrics = BotStateNG.view(state.cache).informational_settings

json =
FarmbotCore.JSON.encode!(%{
"telemetry_captured_at" => DateTime.utc_now(),
"telemetry_soc_temp" => metrics.soc_temp,
"telemetry_throttled" => metrics.throttled,
"telemetry_wifi_level" => metrics.wifi_level,
"telemetry_wifi_level_percent" => metrics.wifi_level_percent,
"telemetry_uptime" => metrics.uptime,
"telemetry_memory_usage" => metrics.memory_usage,
"telemetry_disk_usage" => metrics.disk_usage,
"telemetry_scheduler_usage" => metrics.scheduler_usage,
"telemetry_cpu_usage" => metrics.cpu_usage
})

Basic.publish(state.chan, @exchange, "bot.#{state.jwt.bot}.telemetry", json)
Process.send_after(self(), :dispatch_metrics, @dispatch_metrics_timeout)
{:noreply, state}
end

def handle_info(:consume_telemetry, state) do
_ =
FarmbotTelemetry.consume_telemetry(fn
@@ -80,7 +116,7 @@ defmodule FarmbotExt.AMQP.TelemetryChannel do
Basic.publish(state.chan, @exchange, "bot.#{state.jwt.bot}.telemetry", json)
end)

_ = Process.send_after(self(), :consume_telemetry, 1000)
_ = Process.send_after(self(), :consume_telemetry, @consume_telemetry_timeout)
{:noreply, state}
end
end
@@ -90,6 +90,8 @@ config :farmbot_core, FarmbotCore.Asset.Repo,
pool_size: 1,
database: Path.join(data_path, "asset-prod.sqlite3")

config :farmbot_telemetry, file: to_charlist(Path.join(data_path, 'farmbot-telemetry.dets'))

config :farmbot, FarmbotOS.Platform.Supervisor,
platform_children: [
FarmbotOS.Platform.Target.NervesHubClient,
@@ -90,6 +90,8 @@ config :farmbot_core, FarmbotCore.Asset.Repo,
pool_size: 1,
database: Path.join(data_path, "asset-#{Mix.env()}.sqlite3")

config :farmbot_telemetry, file: to_charlist(Path.join(data_path, 'farmbot-telemetry.dets'))

config :farmbot, FarmbotOS.Platform.Supervisor,
platform_children: [
FarmbotOS.Platform.Target.NervesHubClient,
@@ -1,6 +1,7 @@
defmodule FarmbotOS.Configurator.Router do
@moduledoc "Routes web connections for configuring farmbot os"
require FarmbotCore.Logger
require FarmbotTelemetry

import Phoenix.HTML
use Plug.Router
@@ -40,6 +41,8 @@ defmodule FarmbotOS.Configurator.Router do
end

get "/" do
FarmbotTelemetry.event(:configurator, :configuration_start)

case load_last_reset_reason() do
reason when is_binary(reason) ->
if String.contains?(reason, "CeleryScript request.") do
@@ -1,5 +1,6 @@
defmodule FarmbotOS.SysCalls do
require FarmbotCore.Logger
require FarmbotTelemetry
require Logger

alias FarmbotCeleryScript.AST
@@ -264,6 +265,7 @@ defmodule FarmbotOS.SysCalls do
:ok
else
error ->
FarmbotTelemetry.event(:asset_sync, :sync_error, nil, error: inspect(error))
:ok = BotState.set_sync_status("sync_error")
_ = Leds.green(:slow_blink)
{:error, inspect(error)}
@@ -1,7 +1,8 @@
defmodule FarmbotOS.SysCalls.DumpInfo do
@moduledoc false
require FarmbotCore.Logger
alias FarmbotCore.{Asset, Asset.Private, Config, Project}
require FarmbotTelemetry
alias FarmbotCore.{Asset, Asset.DiagnosticDump, Asset.Private, Config, Project}

def dump_info do
FarmbotCore.Logger.busy(1, "Recording diagnostic dump.")
@@ -28,9 +29,15 @@ defmodule FarmbotOS.SysCalls.DumpInfo do
{:ok, diag} ->
_ = Private.mark_dirty!(diag, %{})
FarmbotCore.Logger.success(1, "Diagnostic dump recorded.")

FarmbotTelemetry.event(:diagnostic_dump, :record, nil,
diagnostic_dump: DiagnosticDump.render(diag)
)

:ok

{:error, changeset} ->
FarmbotTelemetry.event(:diagnostic_dump, :record_error, nil, error: inspect(changeset))
{:error, "error creating diagnostic dump: #{inspect(changeset)}"}
end
end

0 comments on commit ec56eba

Please sign in to comment.
You can’t perform that action at this time.