Skip to content

Commit

Permalink
Refactor and improve testing of metrics polling #235 (#236)
Browse files Browse the repository at this point in the history
Co-authored-by: Samuel <samuel@uniris.io>
  • Loading branch information
apoorv-2204 and Samuel committed Mar 8, 2022
1 parent da1a29c commit 75800a0
Show file tree
Hide file tree
Showing 17 changed files with 654 additions and 578 deletions.
10 changes: 1 addition & 9 deletions assets/js/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,14 @@ Hooks.Logs = {
}
}


//metric dashboard hook /metrics/dashboard
Hooks.network_charts = {
mounted() {

var network_metric_obj = metric_config_obj.create_network_live_visuals();
this.handleEvent("network_points", ({
points
}) => {
console.log("---------------")
console.log(points);
console.log("------------------")
points = metric_config_obj.structure_metric_points(points)

network_metric_obj = metric_config_obj.update_network_live_visuals(network_metric_obj , points);
Expand All @@ -103,12 +100,7 @@ Hooks.explorer_charts = {
this.handleEvent("explorer_stats_points", ({
points
}) => {
console.log(points);
console.log("------------------")
points = metric_config_obj.structure_metric_points(points)
console.log("=================")
console.log(points);
console.log("=================")
explorer_metric_obj = metric_config_obj.update_explorer_live_visuals(explorer_metric_obj , points);
});
}
Expand Down
44 changes: 18 additions & 26 deletions assets/js/metric_config.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ function get_visuals_dom(){
};
}

function generateEchartObjects(heading , echartContainer , x_axis_data){
function generateEchartObjects(heading , echartContainer , x_axis_data){
var y_axis_data =
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
var chart = echarts.init(document.getElementById(echartContainer));
Expand Down Expand Up @@ -111,10 +111,10 @@ function get_visuals_dom(){

return { chart: chart , ydata: y_axis_data};

}
}


function generate_echart_guage(heading , eguageContainer ){
function generate_echart_guage(heading , eguageContainer ){
var guage= echarts.init(document.getElementById(eguageContainer));

var guage_options ={
Expand Down Expand Up @@ -206,23 +206,22 @@ function get_visuals_dom(){

return {"guage": guage , "max": 0}
}


// for proper display of axis labels
function exponent_formatter(new_point) {
if(new_point ==0) return 0
else if (new_point >100000 || new_point <0.0001 ) return parseFloat(new_point).toExponential(2);
else if (new_point <100000 && new_point >=100 ) return Math.floor(parseFloat(new_point));
else if(new_point <100 && new_point >= 0.0001) return parseFloat(new_point).toPrecision(2);
}

//update the charts with new data
function update_chart_data(chart_obj,x_axis_data ,points, point_name){
var new_point = Math.random();
var new_data= chart_obj.ydata[chart_obj.ydata.length-1] + new_point;
// var new_point = points[point_name];
// console.log(new_point)
// var new_data = chart_obj.ydata[chart_obj.ydata.length-1] + new_point;
var shifted = chart_obj.ydata.shift();
var new_data = 0 , new_point =0 ,shifted_value = 0;
new_point = points[point_name];
new_data = chart_obj.ydata[chart_obj.ydata.length-1] + new_point;
shifted_value = chart_obj.ydata.shift();
chart_obj.ydata.push(new_data);
// console.log(chart_obj.ydata);
// console.log(x_axis_data);
chart_obj.chart.setOption({
xAxis: {
data: x_axis_data
Expand All @@ -234,28 +233,22 @@ function update_chart_data(chart_obj,x_axis_data ,points, point_name){
});
}

// function update_card_data(card_obj , points ,point_name ){
// card_obj.textContent = points[point_name]
// }

// update the guage with new data
function update_guage_data(guage_obj , points , point_name )
{
var data =0 ,new_point =0;
// new_point = (Math.random()*100)+(Math.random()*.0010)+(Math.random()*0.001)+(Math.random()*.01)+(Math.random()*0.01);
new_point = points[point_name];
// console.log(new_point)
data = new_point;
if(data >= guage_obj.max ){
guage_obj.max = data
var new_data =0 ,new_point =0;
new_point = points[point_name];
new_data = new_point;
if(new_data >= guage_obj.max ){
guage_obj.max = new_data
}
guage_obj.guage.setOption({series:
[
{ min: 0,
max:guage_obj.max ,splitNumber: 5,
data: [{ value: data }]}]});
data: [{ value: new_data }]}]});
}


function create_network_live_visuals(){
var metric_obj = get_visuals_dom();

Expand All @@ -268,7 +261,6 @@ function update_network_live_visuals(network_metric_obj , points){
}

function update_live_visuals(metric_obj , points){
// console.log()
metric_obj.seconds_after_loading_of_this_graph+= 10;
var shifted = metric_obj.x_axis_data.shift();
metric_obj.x_axis_data.push(metric_obj.seconds_after_loading_of_this_graph);
Expand Down
3 changes: 3 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ config :archethic, ArchEthic.P2P.BootstrappingSeeds, enabled: false

config :archethic, ArchEthic.Mining.PendingTransactionValidation, validate_node_ip: true

config :archethic, ArchEthic.Metrics.Poller, enabled: false
config :archethic, ArchEthic.Metrics.Collector, MockMetricsCollector

config :archethic, ArchEthic.Reward.NetworkPoolScheduler, enabled: false
config :archethic, ArchEthic.Reward.WithdrawScheduler, enabled: false

Expand Down
119 changes: 119 additions & 0 deletions lib/archethic/metrics/aggregator.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
defmodule ArchEthic.Metrics.Aggregator do
@moduledoc """
Provide aggregation methods for the metrics collected
"""

@doc """
Add the TPS based on the transaction validation durations
## Examples
iex> [
...> %{ "archethic_mining_full_transaction_validation_duration" => %{count: 10, sum: 1.5}},
...> %{ "archethic_mining_full_transaction_validation_duration" => %{count: 10, sum: 1.5 }},
...> %{ "archethic_mining_full_transaction_validation_duration" => %{count: 10, sum: 1.5 }},
...> ] |> Aggregator.inject_tps()
[
%{ "tps" => 6.666666666666667 },
%{ "archethic_mining_full_transaction_validation_duration" => %{count: 10, sum: 1.5}},
%{ "archethic_mining_full_transaction_validation_duration" => %{count: 10, sum: 1.5}},
%{ "archethic_mining_full_transaction_validation_duration" => %{count: 10, sum: 1.5}}
]
"""
@spec inject_tps(list(map())) :: list(map())
def inject_tps(metrics) do
%{count: count, sum: sum} =
Enum.reduce(metrics, %{count: 0, sum: 0.0}, fn
%{
"archethic_mining_full_transaction_validation_duration" => %{count: count, sum: sum}
},
acc ->
acc
|> Map.update!(:count, &(&1 + count))
|> Map.update!(:sum, &(&1 + sum))

_, acc ->
acc
end)

if count > 0.0 do
tps = count / sum
[%{"tps" => tps} | metrics]
else
metrics
end
end

@doc """
Reduce through aggregation metrics values
## Examples
iex> [
...> %{"archethic_p2p_send_message_duration" => %{count: 100, sum: 10}},
...> %{"archethic_p2p_send_message_duration" => %{count: 200, sum: 20}},
...> %{"archethic_p2p_send_message_duration" => %{count: 300, sum: 30}}
...> ]|> Aggregator.reduce_values()
%{
"archethic_p2p_send_message_duration" => %{count: 600, sum: 60}
}
"""
@spec reduce_values(list(map())) :: map()
def reduce_values(list_of_metrics) do
list_of_metrics
|> Enum.map(&Map.to_list/1)
|> Enum.reduce(%{}, fn
[{"tps", tps}], acc ->
Map.put(acc, "tps", tps)

[{metric_name, %{count: count, sum: sum}}], acc ->
update_histogram_acc(metric_name, count, sum, acc)

[{metric_name, value}], acc ->
update_guage_acc(metric_name, value, acc)
end)
end

defp update_histogram_acc(metric_name, count, sum, acc) do
acc
|> update_in([Access.key(metric_name, %{}), Access.key(:sum, 0)], &(&1 + sum))
|> update_in([Access.key(metric_name, %{}), Access.key(:count, 0)], &(&1 + count))
end

defp update_guage_acc(metric_name, value, acc) do
update_in(acc, [Access.key(metric_name, 0)], &(&1 + value))
end

@doc """
Aggregate and summarize the metrics
For histograms, it produces an average of the values by doing: `sum/count`
## Examples
iex> %{
...> "archethic_mining_full_transaction_validation_duration" => %{count: 5, sum: 10},
...> "archethic_p2p_send_message_duration" => %{count: 600, sum: 60},
...> "tps" => 10.0,
...> "vm_memory_atom" => 600.0
...> }|> Aggregator.summarize()
[
%{"archethic_mining_full_transaction_validation_duration" => 2.0},
%{"archethic_p2p_send_message_duration" => 0.1},
%{"tps" => 10.0},
%{"vm_memory_atom" => 600.0}
]
"""
@spec summarize(%{
String.t() => number(),
String.t() => %{count: number(), sum: number()}
}) :: [%{String.t() => number()}, ...]
def summarize(map_of_metrics) do
Enum.map(map_of_metrics, fn {metric_name, metric_value} ->
case metric_value do
%{count: 0, sum: _sum} -> %{metric_name => 0.0}
%{count: count, sum: sum} -> %{metric_name => sum / count}
value -> %{metric_name => value}
end
end)
end
end
62 changes: 62 additions & 0 deletions lib/archethic/metrics/collector.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
defmodule ArchEthic.Metrics.Collector do
@moduledoc """
Handle the flow of metrics collection
"""

alias ArchEthic.Metrics.Aggregator
alias ArchEthic.Metrics.Parser
alias ArchEthic.P2P

@callback fetch_metrics(:inet.ip_address()) :: {:ok, String.t()} | {:error, any()}

@doc """
Get the list of Node IP addresses
"""
@spec retrieve_node_ip_addresses() :: list(:inet.ip_address())
def retrieve_node_ip_addresses do
Enum.map(P2P.authorized_nodes(), & &1.ip)
end

@doc """
Responsible for retrieving network metrics.
"""
@spec retrieve_network_metrics(list(:inet.ip_address())) :: map()
def retrieve_network_metrics(node_ip_addresses) do
Task.async_stream(node_ip_addresses, &service().fetch_metrics(&1))
|> Stream.filter(&match?({:ok, {:ok, _}}, &1))
|> Stream.map(fn {:ok, {:ok, result}} -> result end)
|> Stream.map(&Parser.extract_from_string/1)
|> Stream.map(&filter_metrics/1)
|> Stream.map(&Parser.reduce_metrics/1)
|> Enum.flat_map(& &1)
|> Aggregator.inject_tps()
|> Aggregator.reduce_values()
|> Aggregator.summarize()
|> reduce_to_single_map()
end

defp service do
Application.get_env(
:archethic,
__MODULE__,
__MODULE__.MetricsEndpoint
)
end

defp filter_metrics(metrics) do
Enum.filter(metrics, fn %{name: name} -> accept_metric?(name) end)
end

defp accept_metric?("archethic_mining_proof_of_work_duration"), do: true
defp accept_metric?("archethic_mining_full_transaction_validation_duration"), do: true
defp accept_metric?("archethic_p2p_send_message_duration"), do: true
defp accept_metric?(_), do: false

defp reduce_to_single_map(data_list_of_maps) do
Enum.reduce(data_list_of_maps, fn a, b ->
Map.merge(a, b, fn _key, a1, a2 ->
a1 + a2
end)
end)
end
end
53 changes: 53 additions & 0 deletions lib/archethic/metrics/collector/metrics_endpoint.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
defmodule ArchEthic.Metrics.Collector.MetricsEndpoint do
@moduledoc """
This module provides a REST endpoint for metrics.
"""

alias ArchEthic.Metrics.Collector

@behaviour Collector

@node_metric_endpoint_uri "/metrics"
@node_metric_request_type "GET"

@impl Collector
def fetch_metrics(ip_address) do
with {:ok, conn_ref} <- establish_connection(ip_address),
{:ok, conn, _req_ref} <- request(conn_ref) do
stream_responses(conn)
end
end

defp establish_connection(ip) do
port =
Application.get_env(:archethic, ArchEthicWeb.Endpoint)
|> Keyword.get(:http)
|> Keyword.get(:port)

Mint.HTTP.connect(:http, ip |> :inet.ntoa() |> to_string(), port)
end

defp request(conn_ref) do
Mint.HTTP.request(
conn_ref,
@node_metric_request_type,
@node_metric_endpoint_uri,
[],
[]
)
end

defp stream_responses(conn) do
receive do
message ->
with {:ok, conn, [{:status, _, 200}, {:headers, _, _}, {:data, _, data}, {:done, _}]} <-
Mint.HTTP.stream(conn, message),
{:ok, _} <- Mint.HTTP.close(conn) do
{:ok, data}
end
after
5_000 ->
{:error, :timeout}
end
end
end
Loading

0 comments on commit 75800a0

Please sign in to comment.