Skip to content

Commit

Permalink
Optimize network stats fetching
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelmanzanera committed Mar 21, 2023
1 parent d3f8d51 commit b881c8c
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 172 deletions.
99 changes: 7 additions & 92 deletions lib/archethic/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule Archethic.BeaconChain do
alias __MODULE__.Slot.Validation, as: SlotValidation
alias __MODULE__.SlotTimer
alias __MODULE__.Subset
alias __MODULE__.NetworkCoordinates
alias __MODULE__.Subset.P2PSampling
alias __MODULE__.Subset.SummaryCache
alias __MODULE__.Summary
Expand Down Expand Up @@ -194,6 +195,10 @@ defmodule Archethic.BeaconChain do
@spec get_summary_slots(binary()) :: list(TransactionSummary.t())
def get_summary_slots(subset) when is_binary(subset) do
SummaryCache.stream_current_slots(subset)
|> Stream.map(fn
{slot, _} -> slot
slot -> slot
end)
|> Stream.flat_map(fn %Slot{transaction_attestations: transaction_attestations} ->
transaction_summaries =
transaction_attestations
Expand Down Expand Up @@ -419,97 +424,7 @@ defmodule Archethic.BeaconChain do
Retrieve the network stats for a given subset from the cached slots
"""
@spec get_network_stats(binary()) :: %{Crypto.key() => Slot.net_stats()}
def get_network_stats(subset) do
subset
|> SummaryCache.stream_current_slots()
|> Stream.filter(&match?({_, _}, &1))
|> Stream.map(fn
{%Slot{p2p_view: %{network_stats: net_stats}}, node} ->
{node, net_stats}
end)
|> Stream.reject(fn
{_, %{latencies: []}} ->
true

_ ->
false
end)
|> Enum.reduce(%{}, fn {node, net_stats}, acc ->
Map.update(acc, node, [net_stats], &(&1 ++ [net_stats]))
end)
|> Enum.map(fn {node, net_stats} ->
aggregated_stats =
net_stats
|> Enum.zip()
|> Enum.map(fn stats ->
aggregated_latency =
stats
|> Tuple.to_list()
|> Enum.map(& &1.latency)
# The logistic regression is used to avoid impact of outliers
# while providing a weighted approach to priorize the latest samples.
|> weighted_logistic_regression()
|> trunc()

%{latency: aggregated_latency}
end)

{node, aggregated_stats}
end)
|> Enum.into(%{})
end

defp weighted_logistic_regression(list) do
%{sum_weight: sum_weight, sum_weighted_list: sum_weighted_list} =
list
|> clean_outliers()
# We want to apply a weight based on the tier of the latency
|> Utils.chunk_list_in(3)
|> weight_list()
|> Enum.reduce(%{sum_weight: 0.0, sum_weighted_list: 0.0}, fn {weight, weighted_list},
acc ->
acc
|> Map.update!(:sum_weighted_list, &(&1 + Enum.sum(weighted_list)))
|> Map.update!(:sum_weight, &(&1 + weight * Enum.count(weighted_list)))
end)

sum_weighted_list / sum_weight
end

defp clean_outliers(list) do
list_size = Enum.count(list)

sorted_list = Enum.sort(list)

# Compute percentiles (P80, P20) to remove the outliers
p1 = (0.8 * list_size) |> trunc()
p2 = (0.2 * list_size) |> trunc()

max = Enum.at(sorted_list, p1)
min = Enum.at(sorted_list, p2)

Enum.map(list, fn
x when x < min ->
min

x when x > max ->
max

x ->
x
end)
end

defp weight_list(list) do
list
|> Enum.with_index()
|> Enum.map(fn {list, i} ->
# Apply weight of the tier
weight = (i + 1) * (1 / 3)

weighted_list = Enum.map(list, &(&1 * weight))

{weight, weighted_list}
end)
def get_network_stats(subset) when is_binary(subset) do
NetworkCoordinates.aggregate_network_stats(subset)
end
end
175 changes: 146 additions & 29 deletions lib/archethic/beacon_chain/network_coordinates.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,20 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do
@digits ["F", "E", "D", "C", "B", "A", "9", "8", "7", "6", "5", "4", "3", "2", "1", "0"]

alias Archethic.BeaconChain
alias Archethic.BeaconChain.Slot
alias Archethic.BeaconChain.Subset.SummaryCache
alias Archethic.BeaconChain.Subset.P2PSampling

alias Archethic.Crypto

alias Archethic.Election

alias Archethic.P2P
alias Archethic.P2P.Message.GetNetworkStats
alias Archethic.P2P.Message.NetworkStats

alias Archethic.Utils

alias Archethic.TaskSupervisor

@doc """
Expand Down Expand Up @@ -211,41 +217,64 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do

matrix = Nx.broadcast(0, {nb_nodes, nb_nodes})

Task.async_stream(
BeaconChain.list_subsets(),
fn subset ->
%{bounded: bounded_subsets, unbounded: unbouded_subsets} =
Enum.reduce(BeaconChain.list_subsets(), %{bounded: [], unbounded: []}, fn subset, acc ->
beacon_nodes = Election.beacon_storage_nodes(subset, summary_time, authorized_nodes)

stats =
Task.Supervisor.async_stream_nolink(
TaskSupervisor,
beacon_nodes,
&P2P.send_message(&1, %GetNetworkStats{subset: subset}),
on_timeout: :kill_task,
ordered: false
)
|> Stream.filter(fn
# Filter slots with P2P view
{:ok, {:ok, %NetworkStats{stats: stats}}} when map_size(stats) > 0 ->
true

_ ->
false
end)
|> Stream.map(fn {:ok, {:ok, %NetworkStats{stats: stats}}} -> Map.to_list(stats) end)
|> Enum.to_list()
|> List.flatten()
|> Enum.reduce(%{}, fn {node, stats}, acc ->
Map.put(acc, node, stats)
end)
if Utils.key_in_node_list?(beacon_nodes, Crypto.first_node_public_key()) do
Map.update!(acc, :bounded, &[subset | &1])
else
Map.update!(acc, :unbounded, &[{subset, beacon_nodes} | &1])
end
end)

bounded_netstats = Enum.map(bounded_subsets, &{&1, aggregate_network_stats(&1)})

matrix =
unbouded_subsets
|> stream_unbounded_subsets_stats()
|> update_matrix_from_stats(matrix, sorted_node_list)

update_matrix_from_stats(bounded_netstats, matrix, sorted_node_list)
end

{subset, stats}
defp stream_unbounded_subsets_stats(unbounded_subsets) do
aggregated_subsets_by_node =
unbounded_subsets
|> Enum.reduce(%{}, fn {subset, nodes}, acc ->
Enum.reduce(nodes, acc, fn node, acc ->
Map.update(acc, node, [subset], &[subset | &1])
end)
end)

Task.Supervisor.async_stream_nolink(
TaskSupervisor,
aggregated_subsets_by_node,
fn {node, subsets} ->
P2P.send_message(node, %GetNetworkStats{subsets: subsets})
end,
ordered: false,
on_timeout: :kill_task,
ordered: false
max_concurrency: 256
)
|> Stream.filter(&match?({:ok, _}, &1))
|> Enum.reduce(matrix, fn {:ok, {subset, stats}}, acc ->
|> Stream.filter(fn
{:ok, {:ok, %NetworkStats{stats: stats}}} when map_size(stats) > 0 ->
true

_ ->
false
end)
|> Stream.map(fn {:ok, {:ok, %NetworkStats{stats: stats}}} -> stats end)
|> Enum.flat_map(& &1)
|> Enum.reduce(%{}, fn {subset, stats}, acc ->
Enum.reduce(stats, acc, fn {node, stats}, acc ->
Map.update(acc, subset, %{node => stats}, &Map.put(&1, node, stats))
end)
end)
end

defp update_matrix_from_stats(stats_by_subset, matrix, sorted_node_list) do
Enum.reduce(stats_by_subset, matrix, fn {subset, stats}, acc ->
sampling_nodes = P2PSampling.list_nodes_to_sample(subset)

Enum.reduce(stats, acc, fn {node_public_key, stats}, acc ->
Expand Down Expand Up @@ -330,4 +359,92 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do
matrix
end
end

@spec aggregate_network_stats(binary()) :: %{Crypto.key() => Slot.net_stats()}
def aggregate_network_stats(subset) do
subset
|> SummaryCache.stream_current_slots()
|> Stream.filter(&match?({%Slot{p2p_view: %{network_stats: [_ | _]}}, _}, &1))
|> Stream.map(fn
{%Slot{p2p_view: %{network_stats: net_stats}}, node} ->
{node, net_stats}
end)
|> Enum.reduce(%{}, fn {node, net_stats}, acc ->
Map.update(acc, node, [net_stats], &(&1 ++ [net_stats]))
end)
|> Enum.map(fn {node, net_stats} ->
aggregated_stats =
net_stats
|> Enum.zip()
|> Enum.map(fn stats ->
aggregated_latency =
stats
|> Tuple.to_list()
|> Enum.map(& &1.latency)
# The logistic regression is used to avoid impact of outliers
# while providing a weighted approach to priorize the latest samples.
|> weighted_logistic_regression()
|> trunc()

%{latency: aggregated_latency}
end)

{node, aggregated_stats}
end)
|> Enum.into(%{})
end

defp weighted_logistic_regression(list) do
%{sum_weight: sum_weight, sum_weighted_list: sum_weighted_list} =
list
|> clean_outliers()
# We want to apply a weight based on the tier of the latency
|> Utils.chunk_list_in(3)
|> weight_list()
|> Enum.reduce(%{sum_weight: 0.0, sum_weighted_list: 0.0}, fn {weight, weighted_list},
acc ->
acc
|> Map.update!(:sum_weighted_list, &(&1 + Enum.sum(weighted_list)))
|> Map.update!(:sum_weight, &(&1 + weight * Enum.count(weighted_list)))
end)

sum_weighted_list / sum_weight
end

defp clean_outliers(list) do
list_size = Enum.count(list)

sorted_list = Enum.sort(list)

# Compute percentiles (P80, P20) to remove the outliers
p1 = (0.8 * list_size) |> trunc()
p2 = (0.2 * list_size) |> trunc()

max = Enum.at(sorted_list, p1)
min = Enum.at(sorted_list, p2)

Enum.map(list, fn
x when x < min ->
min

x when x > max ->
max

x ->
x
end)
end

defp weight_list(list) do
list
|> Enum.with_index()
|> Enum.map(fn {list, i} ->
# Apply weight of the tier
weight = (i + 1) * (1 / 3)

weighted_list = Enum.map(list, &(&1 * weight))

{weight, weighted_list}
end)
end
end
8 changes: 7 additions & 1 deletion lib/archethic/beacon_chain/subset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,13 @@ defmodule Archethic.BeaconChain.Subset do
end

defp handle_summary(time, subset) do
beacon_slots = SummaryCache.stream_current_slots(subset)
beacon_slots =
subset
|> SummaryCache.stream_current_slots()
|> Stream.map(fn
{slot, _} -> slot
slot -> slot
end)

if Enum.empty?(beacon_slots) do
:ok
Expand Down
4 changes: 3 additions & 1 deletion lib/archethic/beacon_chain/subset/summary_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do
Handle the caching of the beacon slots defined for the summary
"""

alias Archethic.BeaconChain
alias Archethic.BeaconChain.Slot
alias Archethic.BeaconChain.SlotTimer
alias Archethic.BeaconChain.SummaryTimer
alias Archethic.Crypto

Expand Down Expand Up @@ -51,7 +53,7 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do
# Check if there are slots for the next summary
contains_next_summary_slots? =
subset
|> SummaryCache.stream_current_slots()
|> stream_current_slots()
|> Enum.any?(fn
{%Slot{slot_time: slot_time}, _} ->
DateTime.diff(slot_time, previous_summary_time) >= 0
Expand Down
Loading

0 comments on commit b881c8c

Please sign in to comment.