Skip to content

Commit

Permalink
Optimize network stats fetching
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelmanzanera committed Mar 21, 2023
1 parent d3f8d51 commit da77d0e
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 174 deletions.
99 changes: 7 additions & 92 deletions lib/archethic/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule Archethic.BeaconChain do
alias __MODULE__.Slot.Validation, as: SlotValidation
alias __MODULE__.SlotTimer
alias __MODULE__.Subset
alias __MODULE__.NetworkCoordinates
alias __MODULE__.Subset.P2PSampling
alias __MODULE__.Subset.SummaryCache
alias __MODULE__.Summary
Expand Down Expand Up @@ -194,6 +195,10 @@ defmodule Archethic.BeaconChain do
@spec get_summary_slots(binary()) :: list(TransactionSummary.t())
def get_summary_slots(subset) when is_binary(subset) do
SummaryCache.stream_current_slots(subset)
|> Stream.map(fn
{slot, _} -> slot
slot -> slot
end)
|> Stream.flat_map(fn %Slot{transaction_attestations: transaction_attestations} ->
transaction_summaries =
transaction_attestations
Expand Down Expand Up @@ -419,97 +424,7 @@ defmodule Archethic.BeaconChain do
Retrieve the network stats for a given subset from the cached slots
"""
@spec get_network_stats(binary()) :: %{Crypto.key() => Slot.net_stats()}
def get_network_stats(subset) do
subset
|> SummaryCache.stream_current_slots()
|> Stream.filter(&match?({_, _}, &1))
|> Stream.map(fn
{%Slot{p2p_view: %{network_stats: net_stats}}, node} ->
{node, net_stats}
end)
|> Stream.reject(fn
{_, %{latencies: []}} ->
true

_ ->
false
end)
|> Enum.reduce(%{}, fn {node, net_stats}, acc ->
Map.update(acc, node, [net_stats], &(&1 ++ [net_stats]))
end)
|> Enum.map(fn {node, net_stats} ->
aggregated_stats =
net_stats
|> Enum.zip()
|> Enum.map(fn stats ->
aggregated_latency =
stats
|> Tuple.to_list()
|> Enum.map(& &1.latency)
# The logistic regression is used to avoid impact of outliers
# while providing a weighted approach to priorize the latest samples.
|> weighted_logistic_regression()
|> trunc()

%{latency: aggregated_latency}
end)

{node, aggregated_stats}
end)
|> Enum.into(%{})
end

defp weighted_logistic_regression(list) do
%{sum_weight: sum_weight, sum_weighted_list: sum_weighted_list} =
list
|> clean_outliers()
# We want to apply a weight based on the tier of the latency
|> Utils.chunk_list_in(3)
|> weight_list()
|> Enum.reduce(%{sum_weight: 0.0, sum_weighted_list: 0.0}, fn {weight, weighted_list},
acc ->
acc
|> Map.update!(:sum_weighted_list, &(&1 + Enum.sum(weighted_list)))
|> Map.update!(:sum_weight, &(&1 + weight * Enum.count(weighted_list)))
end)

sum_weighted_list / sum_weight
end

defp clean_outliers(list) do
list_size = Enum.count(list)

sorted_list = Enum.sort(list)

# Compute percentiles (P80, P20) to remove the outliers
p1 = (0.8 * list_size) |> trunc()
p2 = (0.2 * list_size) |> trunc()

max = Enum.at(sorted_list, p1)
min = Enum.at(sorted_list, p2)

Enum.map(list, fn
x when x < min ->
min

x when x > max ->
max

x ->
x
end)
end

defp weight_list(list) do
list
|> Enum.with_index()
|> Enum.map(fn {list, i} ->
# Apply weight of the tier
weight = (i + 1) * (1 / 3)

weighted_list = Enum.map(list, &(&1 * weight))

{weight, weighted_list}
end)
def get_network_stats(subset) when is_binary(subset) do
NetworkCoordinates.aggregate_network_stats(subset)
end
end
205 changes: 174 additions & 31 deletions lib/archethic/beacon_chain/network_coordinates.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,20 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do
@digits ["F", "E", "D", "C", "B", "A", "9", "8", "7", "6", "5", "4", "3", "2", "1", "0"]

alias Archethic.BeaconChain
alias Archethic.BeaconChain.Slot
alias Archethic.BeaconChain.Subset.SummaryCache
alias Archethic.BeaconChain.Subset.P2PSampling

alias Archethic.Crypto

alias Archethic.Election

alias Archethic.P2P
alias Archethic.P2P.Message.GetNetworkStats
alias Archethic.P2P.Message.NetworkStats

alias Archethic.Utils

alias Archethic.TaskSupervisor

@doc """
Expand Down Expand Up @@ -211,41 +217,82 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do

matrix = Nx.broadcast(0, {nb_nodes, nb_nodes})

Task.async_stream(
BeaconChain.list_subsets(),
fn subset ->
beacon_nodes = Election.beacon_storage_nodes(subset, summary_time, authorized_nodes)

stats =
Task.Supervisor.async_stream_nolink(
TaskSupervisor,
beacon_nodes,
&P2P.send_message(&1, %GetNetworkStats{subset: subset}),
on_timeout: :kill_task,
ordered: false
)
|> Stream.filter(fn
# Filter slots with P2P view
{:ok, {:ok, %NetworkStats{stats: stats}}} when map_size(stats) > 0 ->
true

_ ->
false
end)
|> Stream.map(fn {:ok, {:ok, %NetworkStats{stats: stats}}} -> Map.to_list(stats) end)
|> Enum.to_list()
|> List.flatten()
|> Enum.reduce(%{}, fn {node, stats}, acc ->
Map.put(acc, node, stats)
end)
%{bounded: bounded_subsets, unbounded: unbounded_subsets} =
get_subsets_bounding(summary_time, authorized_nodes)

bounded_netstats =
bounded_subsets
|> Task.async_stream(&{&1, aggregate_network_stats(&1)})
|> Enum.reduce(%{}, fn
{:ok, {subset, stats}}, acc when map_size(stats) > 0 ->
Map.put(acc, subset, stats)

_, acc ->
acc
end)

unbounded_matrix =
unbounded_subsets
# Aggregate subsets by node
|> Enum.reduce(%{}, fn {subset, beacon_nodes}, acc ->
Enum.reduce(beacon_nodes, acc, fn node, acc ->
Map.update(acc, node, [subset], &[subset | &1])
end)
end)
|> stream_subsets_stats()
# Aggregate stats per node to identify the sampling nodes
|> aggregate_stats_per_subset()
|> update_matrix_from_stats(matrix, sorted_node_list)

full_matrix = update_matrix_from_stats(bounded_netstats, unbounded_matrix, sorted_node_list)
full_matrix
end

defp get_subsets_bounding(summary_time, authorized_nodes) do
Enum.reduce(BeaconChain.list_subsets(), %{bounded: [], unbounded: []}, fn subset, acc ->
beacon_nodes = Election.beacon_storage_nodes(subset, summary_time, authorized_nodes)

if Utils.key_in_node_list?(beacon_nodes, Crypto.first_node_public_key()) do
Map.update!(acc, :bounded, &[subset | &1])
else
Map.update!(acc, :unbounded, &[{subset, beacon_nodes} | &1])
end
end)
end

{subset, stats}
defp stream_subsets_stats(subsets_by_node) do
Task.Supervisor.async_stream_nolink(
TaskSupervisor,
subsets_by_node,
fn {node, subsets} ->
P2P.send_message(node, %GetNetworkStats{subsets: subsets})
end,
ordered: false,
on_timeout: :kill_task,
ordered: false
max_concurrency: 256
)
|> Stream.filter(&match?({:ok, _}, &1))
|> Enum.reduce(matrix, fn {:ok, {subset, stats}}, acc ->
|> Stream.filter(fn
{:ok, {:ok, %NetworkStats{stats: stats}}} when map_size(stats) > 0 ->
true

_ ->
false
end)
|> Stream.map(fn {:ok, {:ok, %NetworkStats{stats: stats}}} -> stats end)
end

defp aggregate_stats_per_subset(stats) do
stats
|> Enum.flat_map(& &1)
|> Enum.reduce(%{}, fn {subset, stats}, acc ->
Enum.reduce(stats, acc, fn {node, stats}, acc ->
Map.update(acc, subset, %{node => stats}, &Map.put(&1, node, stats))
end)
end)
end

defp update_matrix_from_stats(stats_by_subset, matrix, sorted_node_list) do
Enum.reduce(stats_by_subset, matrix, fn {subset, stats}, acc ->
sampling_nodes = P2PSampling.list_nodes_to_sample(subset)

Enum.reduce(stats, acc, fn {node_public_key, stats}, acc ->
Expand Down Expand Up @@ -330,4 +377,100 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do
matrix
end
end

@doc """
Aggregate the network stats from the SummaryCache
The summary cache holds the slots of the current summary identified by beacon node.
Hence we can aggregate the view of one particular beacon node regarding the nodes sampled.
The aggregation is using some weighted logistic regression.
"""
@spec aggregate_network_stats(binary()) :: %{Crypto.key() => Slot.net_stats()}
def aggregate_network_stats(subset) when is_binary(subset) do
subset
|> SummaryCache.stream_current_slots()
|> Stream.filter(&match?({%Slot{p2p_view: %{network_stats: [_ | _]}}, _}, &1))
|> Stream.map(fn
{%Slot{p2p_view: %{network_stats: net_stats}}, node} ->
{node, net_stats}
end)
|> Enum.reduce(%{}, fn {node, net_stats}, acc ->
Map.update(acc, node, [net_stats], &(&1 ++ [net_stats]))
end)
|> Enum.map(fn {node, net_stats} ->
aggregated_stats =
net_stats
|> Enum.zip()
|> Enum.map(fn stats ->
aggregated_latency =
stats
|> Tuple.to_list()
|> Enum.map(& &1.latency)
# The logistic regression is used to avoid impact of outliers
# while providing a weighted approach to priorize the latest samples.
|> weighted_logistic_regression()
|> trunc()

%{latency: aggregated_latency}
end)

{node, aggregated_stats}
end)
|> Enum.into(%{})
end

defp weighted_logistic_regression(list) do
%{sum_weight: sum_weight, sum_weighted_list: sum_weighted_list} =
list
|> clean_outliers()
# We want to apply a weight based on the tier of the latency
|> Utils.chunk_list_in(3)
|> weight_list()
|> Enum.reduce(%{sum_weight: 0.0, sum_weighted_list: 0.0}, fn {weight, weighted_list},
acc ->
acc
|> Map.update!(:sum_weighted_list, &(&1 + Enum.sum(weighted_list)))
|> Map.update!(:sum_weight, &(&1 + weight * Enum.count(weighted_list)))
end)

sum_weighted_list / sum_weight
end

defp clean_outliers(list) do
list_size = Enum.count(list)

sorted_list = Enum.sort(list)

# Compute percentiles (P80, P20) to remove the outliers
p1 = (0.8 * list_size) |> trunc()
p2 = (0.2 * list_size) |> trunc()

max = Enum.at(sorted_list, p1)
min = Enum.at(sorted_list, p2)

Enum.map(list, fn
x when x < min ->
min

x when x > max ->
max

x ->
x
end)
end

defp weight_list(list) do
list
|> Enum.with_index()
|> Enum.map(fn {list, i} ->
# Apply weight of the tier
weight = (i + 1) * (1 / 3)

weighted_list = Enum.map(list, &(&1 * weight))

{weight, weighted_list}
end)
end
end
8 changes: 7 additions & 1 deletion lib/archethic/beacon_chain/subset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,13 @@ defmodule Archethic.BeaconChain.Subset do
end

defp handle_summary(time, subset) do
beacon_slots = SummaryCache.stream_current_slots(subset)
beacon_slots =
subset
|> SummaryCache.stream_current_slots()
|> Stream.map(fn
{slot, _} -> slot
slot -> slot
end)

if Enum.empty?(beacon_slots) do
:ok
Expand Down
4 changes: 3 additions & 1 deletion lib/archethic/beacon_chain/subset/summary_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do
Handle the caching of the beacon slots defined for the summary
"""

alias Archethic.BeaconChain
alias Archethic.BeaconChain.Slot
alias Archethic.BeaconChain.SlotTimer
alias Archethic.BeaconChain.SummaryTimer
alias Archethic.Crypto

Expand Down Expand Up @@ -51,7 +53,7 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do
# Check if there are slots for the next summary
contains_next_summary_slots? =
subset
|> SummaryCache.stream_current_slots()
|> stream_current_slots()
|> Enum.any?(fn
{%Slot{slot_time: slot_time}, _} ->
DateTime.diff(slot_time, previous_summary_time) >= 0
Expand Down
Loading

0 comments on commit da77d0e

Please sign in to comment.