diff --git a/lib/archethic/beacon_chain.ex b/lib/archethic/beacon_chain.ex index 49244a6af..fb11af102 100644 --- a/lib/archethic/beacon_chain.ex +++ b/lib/archethic/beacon_chain.ex @@ -9,6 +9,7 @@ defmodule Archethic.BeaconChain do alias __MODULE__.Slot.Validation, as: SlotValidation alias __MODULE__.SlotTimer alias __MODULE__.Subset + alias __MODULE__.NetworkCoordinates alias __MODULE__.Subset.P2PSampling alias __MODULE__.Subset.SummaryCache alias __MODULE__.Summary @@ -28,6 +29,7 @@ defmodule Archethic.BeaconChain do alias Archethic.P2P.Message.BeaconSummaryList alias Archethic.P2P.Message.NotFound alias Archethic.P2P.Message.TransactionSummaryList + alias Archethic.TaskSupervisor alias Archethic.TransactionChain.TransactionSummary @@ -110,8 +112,8 @@ defmodule Archethic.BeaconChain do @doc """ Load a slot in summary cache """ - @spec load_slot(Slot.t()) :: :ok | :error - def load_slot(slot = %Slot{subset: subset, slot_time: slot_time}) do + @spec load_slot(Slot.t(), Crypto.key()) :: :ok | :error + def load_slot(slot = %Slot{subset: subset, slot_time: slot_time}, node_public_key) do if slot_time == SlotTimer.previous_slot(DateTime.utc_now()) do Task.Supervisor.start_child(TaskSupervisor, fn -> case validate_slot(slot) do @@ -120,7 +122,7 @@ defmodule Archethic.BeaconChain do beacon_subset: Base.encode16(subset) ) - SummaryCache.add_slot(subset, slot) + SummaryCache.add_slot(subset, slot, node_public_key) {:error, reason} -> Logger.error("Invalid beacon slot - #{inspect(reason)}") @@ -195,6 +197,10 @@ defmodule Archethic.BeaconChain do @spec get_summary_slots(binary()) :: list(TransactionSummary.t()) def get_summary_slots(subset) when is_binary(subset) do SummaryCache.stream_current_slots(subset) + |> Stream.map(fn + {slot, _} -> slot + slot -> slot + end) |> Stream.flat_map(fn %Slot{transaction_attestations: transaction_attestations} -> transaction_summaries = transaction_attestations @@ -252,7 +258,12 @@ defmodule Archethic.BeaconChain do if unsubscribe?, do: Update.unsubscribe() Enum.map(list_subsets(), fn subset -> - nodes = Election.beacon_storage_nodes(subset, date, P2P.authorized_and_available_nodes()) + nodes = + Election.beacon_storage_nodes( + subset, + date, + P2P.authorized_and_available_nodes(date, true) + ) nodes = Enum.reject(nodes, fn node -> node.first_public_key == Crypto.first_node_public_key() end) @@ -324,10 +335,10 @@ defmodule Archethic.BeaconChain do @spec list_transactions_summaries_from_current_slot(DateTime.t()) :: list(TransactionSummary.t()) def list_transactions_summaries_from_current_slot(date = %DateTime{} \\ DateTime.utc_now()) do - authorized_nodes = P2P.authorized_and_available_nodes() - next_summary_date = next_summary_date(DateTime.truncate(date, :millisecond)) + authorized_nodes = P2P.authorized_and_available_nodes(next_summary_date, true) + # get the subsets to request per node list_subsets() |> Enum.map(fn subset -> @@ -472,4 +483,12 @@ defmodule Archethic.BeaconChain do e end end + + @doc """ + Retrieve the network stats for a given subset from the cached slots + """ + @spec get_network_stats(binary()) :: %{Crypto.key() => Slot.net_stats()} + def get_network_stats(subset) when is_binary(subset) do + NetworkCoordinates.aggregate_network_stats(subset) + end end diff --git a/lib/archethic/beacon_chain/network_coordinates.ex b/lib/archethic/beacon_chain/network_coordinates.ex index db49d2382..4e947cd31 100644 --- a/lib/archethic/beacon_chain/network_coordinates.ex +++ b/lib/archethic/beacon_chain/network_coordinates.ex @@ -5,7 +5,22 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do @digits ["F", "E", "D", "C", "B", "A", "9", "8", "7", "6", "5", "4", "3", "2", "1", "0"] - @type latency_patch :: {String.t(), String.t()} + alias Archethic.BeaconChain + alias Archethic.BeaconChain.Slot + alias Archethic.BeaconChain.Subset.SummaryCache + alias Archethic.BeaconChain.Subset.P2PSampling + + alias Archethic.Crypto + + alias Archethic.Election + + alias Archethic.P2P + alias Archethic.P2P.Message.GetNetworkStats + alias Archethic.P2P.Message.NetworkStats + + alias Archethic.Utils + + alias Archethic.TaskSupervisor @doc """ Compute the network patch based on the matrix latencies @@ -37,30 +52,27 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do ...> [150, 200, 0] ...> ], names: [:line, :column], type: {:f, 64})) [ - {"B", "8"}, - {"0", "8"}, - {"C", "8"} + "B8", + "08", + "C8" ] """ - @spec get_patch_from_latencies(Nx.Tensor.t()) :: list(latency_patch()) + @spec get_patch_from_latencies(Nx.Tensor.t()) :: list(String.t()) def get_patch_from_latencies(matrix) do - center_mass = compute_distance_from_center_mass(matrix) - gram_matrix = get_gram_matrix(matrix, center_mass) - {x, y} = get_coordinates(gram_matrix) - get_patch_digits(x, y) - end - - # defp get_matrix_tensor() do - # "distance_matrix.dat" - # |> File.read! - # |> String.split("\n", trim: true) - # |> Enum.map(fn line -> - # line - # |> String.split(",", trim: true) - # |> Enum.map(&String.to_float/1) - # end) - # |> Nx.tensor(names: [:line, :column], type: {:f, 64}) - # end + if Nx.size(matrix) > 1 do + formated_matrix = + matrix + |> Nx.as_type(:f64) + |> Nx.rename([:line, :column]) + + center_mass = compute_distance_from_center_mass(formated_matrix) + gram_matrix = get_gram_matrix(formated_matrix, center_mass) + {x, y} = get_coordinates(gram_matrix) + get_patch_digits(x, y) + else + [] + end + end defp compute_distance_from_center_mass(tensor) do matrix_size = Nx.size(tensor[0]) @@ -170,7 +182,7 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do end defp get_patch(x_elem, y_elem, v, max) do - %{x: x_patch, y: y_patch} = + %{x: x, y: y} = Enum.reduce_while(0..15, %{x: "", y: ""}, fn j, acc -> if acc.x != "" and acc.y != "" do {:halt, acc} @@ -184,7 +196,7 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do end end) - {x_patch, y_patch} + "#{x}#{y}" end defp get_digit(acc, coord_name, coord, digit_index, max, v) @@ -193,4 +205,282 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do end defp get_digit(acc, _, _, _, _, _), do: acc + + @doc """ + Fetch remotely the network stats for a given summary time + + This request all the subsets to gather the aggregated network stats. + A NxN latency matrix is then computed based on the network stats origins and targets + """ + @spec fetch_network_stats(DateTime.t()) :: Nx.Tensor.t() + def fetch_network_stats(summary_time = %DateTime{}) do + authorized_nodes = P2P.authorized_and_available_nodes(summary_time, true) + + sorted_node_list = P2P.list_nodes() |> Enum.sort_by(& &1.first_public_key) + nb_nodes = length(sorted_node_list) + + matrix = Nx.broadcast(0, {nb_nodes, nb_nodes}) + + summary_time + |> get_subsets_nodes(authorized_nodes) + # Aggregate subsets by node + |> Enum.reduce(%{}, fn {subset, beacon_nodes}, acc -> + Enum.reduce(beacon_nodes, acc, fn node, acc -> + Map.update(acc, node, [subset], &[subset | &1]) + end) + end) + |> stream_subsets_stats() + # Aggregate stats per node to identify the sampling nodes + |> aggregate_stats_per_subset() + |> update_matrix_from_stats(matrix, sorted_node_list) + end + + defp get_subsets_nodes(summary_time, authorized_nodes) do + Enum.map(BeaconChain.list_subsets(), fn subset -> + beacon_nodes = Election.beacon_storage_nodes(subset, summary_time, authorized_nodes) + {subset, beacon_nodes} + end) + end + + defp stream_subsets_stats(subsets_by_node) do + Task.Supervisor.async_stream_nolink( + TaskSupervisor, + subsets_by_node, + fn {node, subsets} -> + P2P.send_message(node, %GetNetworkStats{subsets: subsets}) + end, + ordered: false, + on_timeout: :kill_task, + max_concurrency: 256 + ) + |> Stream.filter(fn + {:ok, {:ok, %NetworkStats{stats: stats}}} when map_size(stats) > 0 -> + true + + _ -> + false + end) + |> Stream.map(fn {:ok, {:ok, %NetworkStats{stats: stats}}} -> stats end) + end + + defp aggregate_stats_per_subset(stats) do + stats + |> Enum.flat_map(& &1) + |> Enum.reduce(%{}, fn {subset, stats}, acc -> + Enum.reduce(stats, acc, fn {node, stats}, acc -> + Map.update( + acc, + subset, + %{node => [stats]}, + &Map.update(&1, node, [stats], fn prev_stats -> [stats | prev_stats] end) + ) + end) + end) + |> Enum.reduce(%{}, fn {subset, stats_by_node}, acc -> + aggregated_stats_by_node = + Enum.reduce(stats_by_node, %{}, fn {node, stats}, acc -> + Map.put(acc, node, aggregate_stats(stats)) + end) + + Map.put(acc, subset, aggregated_stats_by_node) + end) + end + + defp aggregate_stats(stats) do + stats + |> Enum.zip() + |> Enum.map(fn stats -> + latency = + stats + |> Tuple.to_list() + |> Enum.map(& &1.latency) + |> Utils.mean() + |> trunc() + + %{latency: latency} + end) + end + + defp update_matrix_from_stats(stats_by_subset, matrix, sorted_node_list) do + Enum.reduce(stats_by_subset, matrix, fn {subset, stats}, acc -> + sampling_nodes = P2PSampling.list_nodes_to_sample(subset) + + Enum.reduce(stats, acc, fn {node_public_key, stats}, acc -> + beacon_node_index = + Enum.find_index(sorted_node_list, &(&1.first_public_key == node_public_key)) + + set_matrix_latency(acc, beacon_node_index, sampling_nodes, sorted_node_list, stats) + end) + end) + end + + defp set_matrix_latency( + matrix, + beacon_node_index, + sampling_nodes, + sorted_node_list, + stats + ) do + stats + |> Enum.with_index() + |> Enum.reduce(matrix, fn {%{latency: latency}, index}, acc -> + sample_node = Enum.at(sampling_nodes, index) + + sample_node_index = + Enum.find_index( + sorted_node_list, + &(&1.first_public_key == sample_node.first_public_key) + ) + + # Avoid update if it's the matrix diagonal + if sample_node_index == beacon_node_index do + acc + else + update_matrix(acc, beacon_node_index, sample_node_index, latency) + end + end) + end + + defp update_matrix(matrix, beacon_node_index, sample_node_index, latency) do + existing_points = + matrix + |> Nx.gather( + Nx.tensor([ + [beacon_node_index, sample_node_index], + [sample_node_index, beacon_node_index] + ]) + ) + |> Nx.to_list() + + # Build symmetric matrix + case existing_points do + # Initialize cell + [0, 0] -> + Nx.indexed_put( + matrix, + Nx.tensor([ + [beacon_node_index, sample_node_index], + [sample_node_index, beacon_node_index] + ]), + Nx.tensor([latency, latency]) + ) + + # Take mean when latency differs + [x, y] when (x >= 0 or y >= 0) and (latency != x or latency != y) -> + mean_latency = + if x > 0 do + Archethic.Utils.mean([x, latency]) |> trunc() + else + Archethic.Utils.mean([latency, y]) |> trunc() + end + + Nx.indexed_put( + matrix, + Nx.tensor([ + [beacon_node_index, sample_node_index], + [sample_node_index, beacon_node_index] + ]), + Nx.tensor([mean_latency, mean_latency]) + ) + + [^latency, ^latency] -> + matrix + end + end + + @doc """ + Aggregate the network stats from the SummaryCache + + The summary cache holds the slots of the current summary identified by beacon node. + Hence we can aggregate the view of one particular beacon node regarding the nodes sampled. + + The aggregation is using some weighted logistic regression. + """ + @spec aggregate_network_stats(binary()) :: %{Crypto.key() => Slot.net_stats()} + def aggregate_network_stats(subset) when is_binary(subset) do + subset + |> SummaryCache.stream_current_slots() + |> Stream.filter(&match?({%Slot{p2p_view: %{network_stats: [_ | _]}}, _}, &1)) + |> Stream.map(fn + {%Slot{p2p_view: %{network_stats: net_stats}}, node} -> + {node, net_stats} + end) + |> Enum.reduce(%{}, fn {node, net_stats}, acc -> + Map.update(acc, node, [net_stats], &(&1 ++ [net_stats])) + end) + |> Enum.map(fn {node, net_stats} -> + aggregated_stats = + net_stats + |> Enum.zip() + |> Enum.map(fn stats -> + aggregated_latency = + stats + |> Tuple.to_list() + |> Enum.map(& &1.latency) + # The logistic regression is used to avoid impact of outliers + # while providing a weighted approach to priorize the latest samples. + |> weighted_logistic_regression() + |> trunc() + + %{latency: aggregated_latency} + end) + + {node, aggregated_stats} + end) + |> Enum.into(%{}) + end + + defp weighted_logistic_regression(list) do + %{sum_weight: sum_weight, sum_weighted_list: sum_weighted_list} = + list + |> clean_outliers() + # We want to apply a weight based on the tier of the latency + |> Utils.chunk_list_in(3) + |> weight_list() + |> Enum.reduce(%{sum_weight: 0.0, sum_weighted_list: 0.0}, fn {weight, weighted_list}, + acc -> + acc + |> Map.update!(:sum_weighted_list, &(&1 + Enum.sum(weighted_list))) + |> Map.update!(:sum_weight, &(&1 + weight * Enum.count(weighted_list))) + end) + + sum_weighted_list / sum_weight + end + + defp clean_outliers(list) do + list_size = Enum.count(list) + + sorted_list = Enum.sort(list) + + # Compute percentiles (P80, P20) to remove the outliers + p1 = (0.8 * list_size) |> trunc() + p2 = (0.2 * list_size) |> trunc() + + max = Enum.at(sorted_list, p1) + min = Enum.at(sorted_list, p2) + + Enum.map(list, fn + x when x < min -> + min + + x when x > max -> + max + + x -> + x + end) + end + + defp weight_list(list) do + list + |> Enum.with_index() + |> Enum.map(fn {list, i} -> + # Apply weight of the tier + weight = (i + 1) * (1 / 3) + + weighted_list = Enum.map(list, &(&1 * weight)) + + {weight, weighted_list} + end) + end end diff --git a/lib/archethic/beacon_chain/slot.ex b/lib/archethic/beacon_chain/slot.ex index 5087a47bd..b5b226b96 100644 --- a/lib/archethic/beacon_chain/slot.ex +++ b/lib/archethic/beacon_chain/slot.ex @@ -606,7 +606,7 @@ defmodule Archethic.BeaconChain.Slot do """ @spec involved_nodes(t()) :: list(Node.t()) def involved_nodes(%__MODULE__{subset: subset, slot_time: slot_time}) do - node_list = P2P.authorized_and_available_nodes(slot_time) + node_list = P2P.authorized_and_available_nodes(slot_time, true) Election.beacon_storage_nodes( subset, diff --git a/lib/archethic/beacon_chain/slot_timer.ex b/lib/archethic/beacon_chain/slot_timer.ex index 4abf4f773..ef0614ba7 100644 --- a/lib/archethic/beacon_chain/slot_timer.ex +++ b/lib/archethic/beacon_chain/slot_timer.ex @@ -15,7 +15,6 @@ defmodule Archethic.BeaconChain.SlotTimer do alias Archethic.Crypto alias Archethic.P2P - alias Archethic.P2P.Node alias Archethic.PubSub alias Archethic.Utils @@ -137,17 +136,13 @@ defmodule Archethic.BeaconChain.SlotTimer do if SummaryTimer.match_interval?(slot_time) do # We clean the previously stored summaries - The retention time is for a self repair cycle - # as the aggregates will be handle for long term storage. + # as the aggregates will be handled for long term storage. DB.clear_beacon_summaries() end - case Crypto.first_node_public_key() |> P2P.get_node_info() |> elem(1) do - %Node{authorized?: true, available?: true} -> - Logger.debug("Trigger beacon slots creation at #{Utils.time_to_string(slot_time)}") - Enum.each(list_subset_processes(), &send(&1, {:create_slot, slot_time})) - - _ -> - :skip + if P2P.authorized_and_available_node?(Crypto.first_node_public_key(), slot_time) do + Logger.debug("Trigger beacon slots creation at #{Utils.time_to_string(slot_time)}") + Enum.each(list_subset_processes(), &send(&1, {:create_slot, slot_time})) end next_time = next_slot(next_time) diff --git a/lib/archethic/beacon_chain/subset.ex b/lib/archethic/beacon_chain/subset.ex index 70d4e57fc..d167b2763 100644 --- a/lib/archethic/beacon_chain/subset.ex +++ b/lib/archethic/beacon_chain/subset.ex @@ -5,6 +5,7 @@ defmodule Archethic.BeaconChain.Subset do """ alias Archethic.BeaconChain + alias Archethic.BeaconChain.NetworkCoordinates alias Archethic.BeaconChain.ReplicationAttestation alias Archethic.BeaconChain.Slot alias Archethic.BeaconChain.Slot.EndOfNodeSync @@ -14,6 +15,7 @@ defmodule Archethic.BeaconChain.Subset do alias __MODULE__.P2PSampling alias __MODULE__.SummaryCache + alias __MODULE__.StatsCollector alias Archethic.BeaconChain.SubsetRegistry @@ -288,12 +290,14 @@ defmodule Archethic.BeaconChain.Subset do tx_summary_message = TransactionSummaryMessage.from_transaction_summary(tx_summary) + next_slot_time = BeaconChain.next_slot(timestamp) + # Do not notify beacon storage nodes as they are already aware of the transaction beacon_storage_nodes = Election.beacon_storage_nodes( BeaconChain.subset_from_address(address), - BeaconChain.next_slot(timestamp), - P2P.authorized_and_available_nodes(timestamp) + next_slot_time, + P2P.authorized_and_available_nodes(next_slot_time, true) ) |> Enum.map(& &1.first_public_key) @@ -315,7 +319,7 @@ defmodule Archethic.BeaconChain.Subset do current_slot = %{current_slot | slot_time: time} if summary_time?(time) do - SummaryCache.add_slot(subset, current_slot) + SummaryCache.add_slot(subset, current_slot, Crypto.first_node_public_key()) else next_summary_time = SummaryTimer.next_summary(time) broadcast_beacon_slot(subset, next_summary_time, current_slot) @@ -354,23 +358,64 @@ defmodule Archethic.BeaconChain.Subset do end defp handle_summary(time, subset) do - beacon_slots = SummaryCache.pop_slots(subset) + beacon_slots = + subset + |> SummaryCache.stream_current_slots() + |> Stream.map(fn + {slot, _} -> slot + slot -> slot + end) if Enum.empty?(beacon_slots) do :ok else - Logger.debug("Create beacon summary with #{inspect(beacon_slots, limit: :infinity)}", - beacon_subset: Base.encode16(subset) - ) + Logger.debug("Create beacon summary", beacon_subset: Base.encode16(subset)) + + patch_task = Task.async(fn -> get_network_patches(subset, time) end) summary = %Summary{ subset: subset, summary_time: Utils.truncate_datetime(time, second?: true, microsecond?: true) } - |> Summary.aggregate_slots(beacon_slots, P2PSampling.list_nodes_to_sample(subset)) + |> Summary.aggregate_slots( + beacon_slots, + P2PSampling.list_nodes_to_sample(subset) + ) - BeaconChain.write_beacon_summary(summary) + network_patches = Task.await(patch_task) + BeaconChain.write_beacon_summary(%{summary | network_patches: network_patches}) + + :ok + end + end + + defp get_network_patches(subset, summary_time) do + with true <- length(P2P.authorized_and_available_nodes()) > 1, + sampling_nodes when sampling_nodes != [] <- P2PSampling.list_nodes_to_sample(subset) do + sampling_nodes_indexes = + P2P.list_nodes() + |> Enum.sort_by(& &1.first_public_key) + |> Enum.with_index() + |> Enum.filter(fn {node, _index} -> + Utils.key_in_node_list?(sampling_nodes, node.first_public_key) + end) + |> Enum.map(fn {_, index} -> index end) + + summary_time + |> StatsCollector.get() + |> NetworkCoordinates.get_patch_from_latencies() + |> Enum.with_index() + |> Enum.filter(fn {_, index} -> + index in sampling_nodes_indexes + end) + |> Enum.map(fn {patch, _} -> + # TODO: define the bandwidth digits. We take "A" as medium value for now. + "#{patch}A" + end) + else + _ -> + [] end end @@ -385,7 +430,7 @@ defmodule Archethic.BeaconChain.Subset do end defp beacon_summary_node?(subset, summary_time, node_public_key) do - node_list = P2P.authorized_and_available_nodes(summary_time) + node_list = P2P.authorized_and_available_nodes(summary_time, true) Election.beacon_storage_nodes( subset, diff --git a/lib/archethic/beacon_chain/subset/stats_collector.ex b/lib/archethic/beacon_chain/subset/stats_collector.ex new file mode 100644 index 000000000..7a48af937 --- /dev/null +++ b/lib/archethic/beacon_chain/subset/stats_collector.ex @@ -0,0 +1,65 @@ +defmodule Archethic.BeaconChain.Subset.StatsCollector do + @moduledoc """ + Process responsible to collect subset network stats + and reply to parallels requests to reduce the network load. + """ + + @vsn Mix.Project.config()[:version] + use GenServer + + alias Archethic.BeaconChain.NetworkCoordinates + + require Logger + + def start_link(_arg \\ []) do + GenServer.start_link(__MODULE__, [], name: __MODULE__) + end + + @spec get(DateTime.t()) :: Nx.Tensor.t() + def get(summary_time) do + try do + GenServer.call(__MODULE__, {:get, summary_time}) + catch + :exit, {:timeout, _} -> + Logger.warning("Fetching network stats take longer than 5s") + Nx.tensor(0) + end + end + + def init(_) do + {:ok, %{fetching_task: nil, clients: []}} + end + + def handle_call({:get, summary_time}, from, state = %{fetching_task: nil}) do + task = Task.async(fn -> NetworkCoordinates.fetch_network_stats(summary_time) end) + + new_state = + state + |> Map.update!(:clients, &[from | &1]) + |> Map.put(:fetching_task, task) + + {:noreply, new_state} + end + + def handle_call({:get, _summary_time}, from, state = %{fetching_task: _}) do + new_state = + state + |> Map.update!(:clients, &[from | &1]) + + {:noreply, new_state} + end + + def handle_info({ref, stats}, state = %{clients: clients, fetching_task: %Task{ref: ref_task}}) + when ref_task == ref do + Enum.each(clients, &GenServer.reply(&1, stats)) + + new_state = + state + |> Map.put(:clients, []) + |> Map.put(:fetching_task, nil) + + {:noreply, new_state} + end + + def handle_info({:DOWN, _ref, :process, _pid, _reason}, state), do: {:noreply, state} +end diff --git a/lib/archethic/beacon_chain/subset/summary_cache.ex b/lib/archethic/beacon_chain/subset/summary_cache.ex index 06525621b..72328dbd0 100644 --- a/lib/archethic/beacon_chain/subset/summary_cache.ex +++ b/lib/archethic/beacon_chain/subset/summary_cache.ex @@ -3,13 +3,17 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do Handle the caching of the beacon slots defined for the summary """ + alias Archethic.BeaconChain alias Archethic.BeaconChain.Slot + alias Archethic.BeaconChain.SlotTimer + alias Archethic.BeaconChain.SummaryTimer + alias Archethic.Crypto + + alias Archethic.PubSub alias Archethic.Utils alias Archethic.Utils.VarInt - alias Archethic.BeaconChain.SummaryTimer - use GenServer @vsn Mix.Project.config()[:version] @@ -27,18 +31,63 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do read_concurrency: true ]) - :ok = recover_slots() + :ok = recover_slots(SummaryTimer.next_summary(DateTime.utc_now())) + + PubSub.register_to_current_epoch_of_slot_time() {:ok, %{}} end + def handle_info({:current_epoch_of_slot_timer, slot_time}, state) do + # Check if the slot in the first one of the summary interval + previous_summary_time = SummaryTimer.previous_summary(slot_time) + first_slot_time = SlotTimer.next_slot(previous_summary_time) + + if slot_time == first_slot_time do + Enum.each( + BeaconChain.list_subsets(), + &clean_previous_summary_cache(&1, previous_summary_time) + ) + + File.rm(recover_path(previous_summary_time)) + end + + {:noreply, state} + end + + def code_change("1.0.7", state, _extra) do + next_summary_time = SummaryTimer.next_summary(DateTime.utc_now()) + File.rename("slot_backup", "slot_backup-#{DateTime.to_unix(next_summary_time)}") + PubSub.register_to_current_epoch_of_slot_time() + {:ok, state} + end + + def code_change(_, state, _), do: {:ok, state} + + defp clean_previous_summary_cache(subset, previous_summary_time) do + subset + |> stream_current_slots() + |> Stream.filter(fn + {%Slot{slot_time: slot_time}, _} -> + DateTime.compare(slot_time, previous_summary_time) == :lt + + %Slot{slot_time: slot_time} -> + DateTime.compare(slot_time, previous_summary_time) == :lt + end) + |> Stream.each(fn item -> + :ets.delete_object(@table_name, {subset, item}) + end) + |> Stream.run() + end + @doc """ Stream all the entries for a subset """ - @spec stream_current_slots(subset :: binary()) :: Enumerable.t() | list(Slot.t()) + @spec stream_current_slots(subset :: binary()) :: + Enumerable.t() | list({Slot.t(), Crypto.key()}) def stream_current_slots(subset) do # generate match pattern - # :ets.fun2ms(fn {cle, value} when cle == subset -> value end) + # :ets.fun2ms(fn {key, value} when key == subset -> value end) match_pattern = [{{:"$1", :"$2"}, [{:==, :"$1", subset}], [:"$2"]}] Stream.resource( @@ -56,58 +105,55 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do {slot, :ets.select(continuation)} end - @doc """ - Extract all the entries in the cache - """ - @spec pop_slots(subset :: binary()) :: list(Slot.t()) - def pop_slots(subset) do - recover_path() |> File.rm() - - :ets.take(@table_name, subset) - |> Enum.map(fn {_, slot} -> - slot - end) - end - @doc """ Add new beacon slots to the summary's cache """ - @spec add_slot(subset :: binary(), Slot.t()) :: :ok - def add_slot(subset, slot = %Slot{}) do - true = :ets.insert(@table_name, {subset, slot}) - backup_slot(slot) + @spec add_slot(subset :: binary(), Slot.t(), Crypto.key()) :: :ok + def add_slot(subset, slot = %Slot{}, node_public_key) do + true = :ets.insert(@table_name, {subset, {slot, node_public_key}}) + backup_slot(slot, node_public_key) end - defp recover_path(), do: Utils.mut_dir("slot_backup") + defp recover_path(summary_time = %DateTime{}), + do: Utils.mut_dir("slot_backup-#{DateTime.to_unix(summary_time)}") - defp backup_slot(slot) do - content = serialize(slot) + defp backup_slot(slot = %Slot{slot_time: slot_time}, node_public_key) do + content = serialize(slot, node_public_key) + next_summary_time = SummaryTimer.next_summary(slot_time) - recover_path() + next_summary_time + |> recover_path() |> File.write!(content, [:append, :binary]) end - defp recover_slots() do - if File.exists?(recover_path()) do + defp recover_slots(summary_time) do + if File.exists?(recover_path(summary_time)) do next_summary_time = DateTime.utc_now() |> SummaryTimer.next_summary() |> DateTime.to_unix() - content = File.read!(recover_path()) + content = File.read!(recover_path(summary_time)) deserialize(content, []) - |> Enum.each(fn {summary_time, slot = %Slot{subset: subset}} -> - if summary_time == next_summary_time, do: true = :ets.insert(@table_name, {subset, slot}) + |> Enum.each(fn + {summary_time, slot = %Slot{subset: subset}, node_public_key} -> + if summary_time == next_summary_time, + do: true = :ets.insert(@table_name, {subset, {slot, node_public_key}}) + + # Backward compatibility + {summary_time, slot = %Slot{subset: subset}} -> + if summary_time == next_summary_time, + do: true = :ets.insert(@table_name, {subset, slot}) end) else :ok end end - defp serialize(slot = %Slot{slot_time: slot_time}) do + defp serialize(slot = %Slot{slot_time: slot_time}, node_public_key) do summary_time = SummaryTimer.next_summary(slot_time) |> DateTime.to_unix() slot_bin = Slot.serialize(slot) |> Utils.wrap_binary() slot_size = byte_size(slot_bin) |> VarInt.from_value() - <> + <> end defp deserialize(<<>>, acc), do: acc @@ -118,6 +164,13 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do <> = rest {slot, _} = Slot.deserialize(slot_bin) - deserialize(rest, [{summary_time, slot} | acc]) + # Backward compatibility + try do + {node_public_key, rest} = Utils.deserialize_public_key(rest) + deserialize(rest, [{summary_time, slot, node_public_key} | acc]) + catch + _ -> + deserialize(rest, [{summary_time, slot} | acc]) + end end end diff --git a/lib/archethic/beacon_chain/subset/supervisor.ex b/lib/archethic/beacon_chain/subset/supervisor.ex index a2e1d5c45..6a666bd27 100644 --- a/lib/archethic/beacon_chain/subset/supervisor.ex +++ b/lib/archethic/beacon_chain/subset/supervisor.ex @@ -6,6 +6,7 @@ defmodule Archethic.BeaconChain.SubsetSupervisor do alias Archethic.BeaconChain alias Archethic.BeaconChain.Subset alias Archethic.BeaconChain.Subset.SummaryCache + alias Archethic.BeaconChain.Subset.StatsCollector alias Archethic.Utils @@ -16,11 +17,13 @@ defmodule Archethic.BeaconChain.SubsetSupervisor do def init(_) do subset_children = subset_child_specs(BeaconChain.list_subsets()) - children = [ - {Registry, - keys: :unique, name: BeaconChain.SubsetRegistry, partitions: System.schedulers_online()} - | Utils.configurable_children([SummaryCache | subset_children]) - ] + children = + [ + {Registry, + keys: :unique, name: BeaconChain.SubsetRegistry, partitions: System.schedulers_online()} + | Utils.configurable_children([SummaryCache | subset_children]) + ] + |> Enum.concat([StatsCollector]) Supervisor.init(children, strategy: :one_for_one) end diff --git a/lib/archethic/beacon_chain/summary.ex b/lib/archethic/beacon_chain/summary.ex index 9936a69c8..b4a1b5a24 100644 --- a/lib/archethic/beacon_chain/summary.ex +++ b/lib/archethic/beacon_chain/summary.ex @@ -30,7 +30,8 @@ defmodule Archethic.BeaconChain.Summary do node_availabilities: <<>>, node_average_availabilities: [], end_of_node_synchronizations: [], - version: 1 + network_patches: [], + version: 2 ] @type t :: %__MODULE__{ @@ -41,6 +42,7 @@ defmodule Archethic.BeaconChain.Summary do node_availabilities: bitstring(), node_average_availabilities: list(float()), end_of_node_synchronizations: list(Crypto.key()), + network_patches: list(binary()), version: pos_integer() } @@ -371,12 +373,13 @@ defmodule Archethic.BeaconChain.Summary do ...> node_availabilities: <<1::1, 1::1>>, ...> node_average_availabilities: [1.0, 1.0], ...> end_of_node_synchronizations: [<<0, 1, 190, 20, 188, 141, 156, 135, 91, 37, 96, 187, 27, 24, 41, 130, 118, - ...> 93, 43, 240, 229, 97, 227, 194, 31, 97, 228, 78, 156, 194, 154, 74, 160, 104>>] + ...> 93, 43, 240, 229, 97, 227, 194, 31, 97, 228, 78, 156, 194, 154, 74, 160, 104>>], + ...> network_patches: ["A0C", "0EF"] ...> } ...> |> Summary.serialize() << # Version - 1, + 2, # Subset 0, # Summary time @@ -425,7 +428,13 @@ defmodule Archethic.BeaconChain.Summary do 0, 1, 190, 20, 188, 141, 156, 135, 91, 37, 96, 187, 27, 24, 41, 130, 118, 93, 43, 240, 229, 97, 227, 194, 31, 97, 228, 78, 156, 194, 154, 74, 160, 104, # Availability adding time - 3, 132 + 3, 132, + # Nb patches + 1, 2, + # A0C patch + "A0C", + # 0EF patch + "0EF" >> """ @spec serialize(t()) :: bitstring() @@ -437,7 +446,8 @@ defmodule Archethic.BeaconChain.Summary do node_availabilities: node_availabilities, node_average_availabilities: node_average_availabilities, end_of_node_synchronizations: end_of_node_synchronizations, - availability_adding_time: availability_adding_time + availability_adding_time: availability_adding_time, + network_patches: network_patches }) do transaction_attestations_bin = transaction_attestations @@ -458,11 +468,15 @@ defmodule Archethic.BeaconChain.Summary do encoded_end_of_node_synchronizations_len = length(end_of_node_synchronizations) |> VarInt.from_value() + network_patches_len = network_patches |> length() |> VarInt.from_value() + network_patches_bin = :erlang.list_to_binary(network_patches) + <> + end_of_node_synchronizations_bin::binary, availability_adding_time::16, + network_patches_len::binary, network_patches_bin::binary>> end @doc """ @@ -482,34 +496,79 @@ defmodule Archethic.BeaconChain.Summary do ...> 93, 43, 240, 229, 97, 227, 194, 31, 97, 228, 78, 156, 194, 154, 74, 160, 104, 3, 132 >> ...> |> Summary.deserialize() { - %Summary{ - subset: <<0>>, - summary_time: ~U[2021-01-20 00:00:00Z], - availability_adding_time: 900, - transaction_attestations: [ - %ReplicationAttestation{ - transaction_summary: %TransactionSummary{ - address: <<0, 0, 234, 233, 156, 155, 114, 241, 116, 246, 27, 130, 162, 205, 249, 65, 232, 166, - 99, 207, 133, 252, 112, 223, 41, 12, 206, 162, 233, 28, 49, 204, 255, 12>>, - timestamp: ~U[2020-06-25 15:11:53.000Z], - type: :transfer, - movements_addresses: [], - fee: 10_000_000, - validation_stamp_checksum: <<17, 8, 18, 246, 127, 161, 225, 240, 17, 127, 111, 61, 112, 36, 28, 26, 66, - 167, 176, 119, 17, 169, 60, 36, 119, 204, 81, 109, 144, 66, 249, 219>> - }, - confirmations: [{0, <<255, 120, 232, 52, 141, 15, 97, 213, 231, 93, 242, 160, 123, 25, 192, 3, 133, - 170, 197, 102, 148, 208, 119, 130, 225, 102, 130, 96, 223, 61, 36, 76, 229, - 210, 5, 142, 79, 249, 177, 51, 15, 45, 45, 141, 217, 85, 77, 146, 199, 126, - 213, 205, 108, 164, 167, 112, 201, 194, 113, 133, 242, 104, 254, 253>>}] - } - ], - node_availabilities: <<1::1, 1::1>>, - node_average_availabilities: [1.0, 1.0], - end_of_node_synchronizations: [<<0, 1, 190, 20, 188, 141, 156, 135, 91, 37, 96, 187, 27, 24, 41, 130, 118, - 93, 43, 240, 229, 97, 227, 194, 31, 97, 228, 78, 156, 194, 154, 74, 160, 104>>] - }, - "" + %Summary{ + version: 1, + subset: <<0>>, + summary_time: ~U[2021-01-20 00:00:00Z], + availability_adding_time: 900, + transaction_attestations: [ + %ReplicationAttestation{ + transaction_summary: %TransactionSummary{ + address: <<0, 0, 234, 233, 156, 155, 114, 241, 116, 246, 27, 130, 162, 205, 249, 65, 232, 166, + 99, 207, 133, 252, 112, 223, 41, 12, 206, 162, 233, 28, 49, 204, 255, 12>>, + timestamp: ~U[2020-06-25 15:11:53.000Z], + type: :transfer, + movements_addresses: [], + fee: 10_000_000, + validation_stamp_checksum: <<17, 8, 18, 246, 127, 161, 225, 240, 17, 127, 111, 61, 112, 36, 28, 26, 66, + 167, 176, 119, 17, 169, 60, 36, 119, 204, 81, 109, 144, 66, 249, 219>> + }, + confirmations: [{0, <<255, 120, 232, 52, 141, 15, 97, 213, 231, 93, 242, 160, 123, 25, 192, 3, 133, + 170, 197, 102, 148, 208, 119, 130, 225, 102, 130, 96, 223, 61, 36, 76, 229, + 210, 5, 142, 79, 249, 177, 51, 15, 45, 45, 141, 217, 85, 77, 146, 199, 126, + 213, 205, 108, 164, 167, 112, 201, 194, 113, 133, 242, 104, 254, 253>>}] + } + ], + node_availabilities: <<1::1, 1::1>>, + node_average_availabilities: [1.0, 1.0], + end_of_node_synchronizations: [<<0, 1, 190, 20, 188, 141, 156, 135, 91, 37, 96, 187, 27, 24, 41, 130, 118, + 93, 43, 240, 229, 97, 227, 194, 31, 97, 228, 78, 156, 194, 154, 74, 160, 104>>] + }, + "" + } + + iex> << 2, 0, 96, 7, 114, 128, 1, 1, 1, 1, 0, 0, 234, 233, 156, 155, 114, 241, 116, 246, 27, 130, 162, 205, 249, 65, 232, 166, + ...> 99, 207, 133, 252, 112, 223, 41, 12, 206, 162, 233, 28, 49, 204, 255, 12, 0, 0, 1, 114, 236, 9, 2, 168, + ...> 253, 0, 0, 0, 0, 0, 152, 150, 128, 1, 0, 17, 8, 18, 246, 127, 161, 225, 240, 17, 127, 111, 61, 112, 36, 28, 26, 66, + ...> 167, 176, 119, 17, 169, 60, 36, 119, 204, 81, 109, 144, 66, 249, 219, 1, 0, 64, + ...> 255, 120, 232, 52, 141, 15, 97, 213, 231, 93, 242, 160, 123, 25, 192, 3, 133, + ...> 170, 197, 102, 148, 208, 119, 130, 225, 102, 130, 96, 223, 61, 36, 76, 229, + ...> 210, 5, 142, 79, 249, 177, 51, 15, 45, 45, 141, 217, 85, 77, 146, 199, 126, + ...> 213, 205, 108, 164, 167, 112, 201, 194, 113, 133, 242, 104, 254, 253, 0, 2, 1::1, 1::1, 100, 100, 1, 1, + ...> 0, 1, 190, 20, 188, 141, 156, 135, 91, 37, 96, 187, 27, 24, 41, 130, 118, + ...> 93, 43, 240, 229, 97, 227, 194, 31, 97, 228, 78, 156, 194, 154, 74, 160, 104, 3, 132, 1, 2, "A0C", "0EF">> + ...> |> Summary.deserialize() + { + %Summary{ + version: 2, + subset: <<0>>, + summary_time: ~U[2021-01-20 00:00:00Z], + availability_adding_time: 900, + transaction_attestations: [ + %ReplicationAttestation{ + transaction_summary: %TransactionSummary{ + address: <<0, 0, 234, 233, 156, 155, 114, 241, 116, 246, 27, 130, 162, 205, 249, 65, 232, 166, + 99, 207, 133, 252, 112, 223, 41, 12, 206, 162, 233, 28, 49, 204, 255, 12>>, + timestamp: ~U[2020-06-25 15:11:53.000Z], + type: :transfer, + movements_addresses: [], + fee: 10_000_000, + validation_stamp_checksum: <<17, 8, 18, 246, 127, 161, 225, 240, 17, 127, 111, 61, 112, 36, 28, 26, 66, + 167, 176, 119, 17, 169, 60, 36, 119, 204, 81, 109, 144, 66, 249, 219>> + }, + confirmations: [{0, <<255, 120, 232, 52, 141, 15, 97, 213, 231, 93, 242, 160, 123, 25, 192, 3, 133, + 170, 197, 102, 148, 208, 119, 130, 225, 102, 130, 96, 223, 61, 36, 76, 229, + 210, 5, 142, 79, 249, 177, 51, 15, 45, 45, 141, 217, 85, 77, 146, 199, 126, + 213, 205, 108, 164, 167, 112, 201, 194, 113, 133, 242, 104, 254, 253>>}] + } + ], + node_availabilities: <<1::1, 1::1>>, + node_average_availabilities: [1.0, 1.0], + end_of_node_synchronizations: [<<0, 1, 190, 20, 188, 141, 156, 135, 91, 37, 96, 187, 27, 24, 41, 130, 118, + 93, 43, 240, 229, 97, 227, 194, 31, 97, 228, 78, 156, 194, 154, 74, 160, 104>>], + network_patches: ["A0C", "0EF"] + }, + "" } """ @spec deserialize(bitstring()) :: {t(), bitstring()} @@ -538,7 +597,47 @@ defmodule Archethic.BeaconChain.Summary do transaction_attestations: transaction_attestations, node_availabilities: availabilities, node_average_availabilities: node_average_availabilities, - end_of_node_synchronizations: end_of_node_synchronizations + end_of_node_synchronizations: end_of_node_synchronizations, + version: 1 + }, rest} + end + + def deserialize(<<2::8, subset::8, summary_timestamp::32, rest::bitstring>>) do + {nb_transaction_attestations, rest} = rest |> VarInt.get_value() + + {transaction_attestations, rest} = + Utils.deserialize_transaction_attestations(rest, nb_transaction_attestations, []) + + <> = + rest + + <> = rest + + {nb_end_of_sync, rest} = rest |> VarInt.get_value() + + {end_of_node_synchronizations, <>} = + Utils.deserialize_public_key_list(rest, nb_end_of_sync, []) + + node_average_availabilities = for <>, do: avg / 100 + + {nb_patches, rest} = Utils.VarInt.get_value(rest) + <> = rest + + network_patches = + for <> do + patch + end + + {%__MODULE__{ + subset: <>, + summary_time: DateTime.from_unix!(summary_timestamp), + availability_adding_time: availability_adding_time, + transaction_attestations: transaction_attestations, + node_availabilities: availabilities, + node_average_availabilities: node_average_availabilities, + end_of_node_synchronizations: end_of_node_synchronizations, + network_patches: network_patches, + version: 2 }, rest} end end diff --git a/lib/archethic/beacon_chain/summary_aggregate.ex b/lib/archethic/beacon_chain/summary_aggregate.ex index 0d4f1ef4f..f56bb3cf5 100644 --- a/lib/archethic/beacon_chain/summary_aggregate.ex +++ b/lib/archethic/beacon_chain/summary_aggregate.ex @@ -8,17 +8,19 @@ defmodule Archethic.BeaconChain.SummaryAggregate do defstruct [ :summary_time, availability_adding_time: [], - version: 1, + version: 2, replication_attestations: [], p2p_availabilities: %{} ] alias Archethic.Crypto + alias Archethic.BeaconChain.Subset.P2PSampling alias Archethic.BeaconChain.ReplicationAttestation alias Archethic.BeaconChain.Summary, as: BeaconSummary alias Archethic.Utils + alias Archethic.Utils.VarInt require Logger @@ -35,7 +37,8 @@ defmodule Archethic.BeaconChain.SummaryAggregate do (subset :: binary()) => %{ node_availabilities: bitstring(), node_average_availabilities: list(float()), - end_of_node_synchronizations: list(Crypto.key()) + end_of_node_synchronizations: list(Crypto.key()), + network_patches: list(list(String.t()) | String.t()) } } } @@ -52,7 +55,8 @@ defmodule Archethic.BeaconChain.SummaryAggregate do node_availabilities: node_availabilities, node_average_availabilities: node_average_availabilities, end_of_node_synchronizations: end_of_node_synchronizations, - availability_adding_time: availability_adding_time + availability_adding_time: availability_adding_time, + network_patches: network_patches } ) do agg = @@ -76,7 +80,8 @@ defmodule Archethic.BeaconChain.SummaryAggregate do Access.key(subset, %{ node_availabilities: [], node_average_availabilities: [], - end_of_node_synchronizations: [] + end_of_node_synchronizations: [], + network_patches: [] }) ], fn prev -> @@ -93,6 +98,10 @@ defmodule Archethic.BeaconChain.SummaryAggregate do :end_of_node_synchronizations, &Enum.concat(&1, end_of_node_synchronizations) ) + |> Map.update!( + :network_patches, + &Enum.concat(&1, [network_patches]) + ) end ) else @@ -102,6 +111,7 @@ defmodule Archethic.BeaconChain.SummaryAggregate do @doc """ Aggregate summaries batch + """ @spec aggregate(t()) :: t() def aggregate(agg) do @@ -125,7 +135,8 @@ defmodule Archethic.BeaconChain.SummaryAggregate do data |> Map.update!(:node_availabilities, &aggregate_node_availabilities/1) |> Map.update!(:node_average_availabilities, &aggregate_node_average_availabilities/1) - |> Map.update!(:end_of_node_synchronizations, &Enum.uniq/1)} + |> Map.update!(:end_of_node_synchronizations, &Enum.uniq/1) + |> Map.update!(:network_patches, &aggregate_network_patches(&1, subset))} end) |> Enum.into(%{}) end) @@ -188,6 +199,60 @@ defmodule Archethic.BeaconChain.SummaryAggregate do end) end + defp aggregate_network_patches(network_patches, subset) do + sampling_nodes = P2PSampling.list_nodes_to_sample(subset) + + network_patches + |> Enum.filter(&(length(&1) == length(sampling_nodes))) + |> Enum.zip() + |> Enum.map(fn network_patches -> + network_patches + |> Tuple.to_list() + |> Enum.dedup() + |> resolve_patches_conflicts() + end) + |> List.flatten() + end + + defp resolve_patches_conflicts([patch]), do: patch + + defp resolve_patches_conflicts(conflicts_patches) do + splitted_patches = Enum.map(conflicts_patches, &String.split(&1, "", trim: true)) + + # Aggregate the conflicts patch to get a final network patch + # We can use mean as the outliers will only impact the way a node + # fetch data. Because we don't if the truth is coming from the outliers + # or not, the mean will result in the smallest approximation. + latency_patch = + splitted_patches + |> Enum.map(fn d -> + d + |> Enum.take(2) + |> Enum.map(&String.to_integer(&1, 16)) + end) + |> Enum.zip() + |> Enum.map_join(fn x -> + x + |> Tuple.to_list() + |> Utils.median() + |> trunc() + |> Integer.to_string(16) + end) + + bandwidth_patch = + splitted_patches + |> Enum.map(fn digits -> + digits + |> List.last() + |> String.to_integer(16) + end) + |> Utils.median() + |> trunc() + |> Integer.to_string(16) + + "#{latency_patch}#{bandwidth_patch}" + end + @doc """ Determine when the aggregate is empty """ @@ -204,6 +269,7 @@ defmodule Archethic.BeaconChain.SummaryAggregate do ## Examples iex> %SummaryAggregate{ + ...> version: 2, ...> summary_time: ~U[2022-03-01 00:00:00Z], ...> replication_attestations: [ ...> %ReplicationAttestation{ @@ -234,14 +300,15 @@ defmodule Archethic.BeaconChain.SummaryAggregate do ...> end_of_node_synchronizations: [ ...> <<0, 1, 57, 98, 198, 202, 155, 43, 217, 149, 5, 213, 109, 252, 111, 87, 231, 170, 54, ...> 211, 178, 208, 5, 184, 33, 193, 167, 91, 160, 131, 129, 117, 45, 242>> - ...> ] + ...> ], + ...> network_patches: ["ABC", "DEF"] ...> } ...> }, ...> availability_adding_time: 900 ...> } |> SummaryAggregate.serialize() << # Version - 1, + 2, # Summary time 98, 29, 98, 0, # Nb replication attestations @@ -290,6 +357,12 @@ defmodule Archethic.BeaconChain.SummaryAggregate do # End of node synchronization 0, 1, 57, 98, 198, 202, 155, 43, 217, 149, 5, 213, 109, 252, 111, 87, 231, 170, 54, 211, 178, 208, 5, 184, 33, 193, 167, 91, 160, 131, 129, 117, 45, 242, + # Nb patches + 1, 2, + # Patch "ABC" + "ABC", + # Patch "DEF" + "DEF", # Availability adding time 3, 132 >> @@ -302,7 +375,7 @@ defmodule Archethic.BeaconChain.SummaryAggregate do p2p_availabilities: p2p_availabilities, availability_adding_time: availability_adding_time }) do - nb_attestations = Utils.VarInt.from_value(length(attestations)) + nb_attestations = VarInt.from_value(length(attestations)) attestations_bin = attestations @@ -315,21 +388,30 @@ defmodule Archethic.BeaconChain.SummaryAggregate do %{ node_availabilities: node_availabilities, node_average_availabilities: node_avg_availabilities, - end_of_node_synchronizations: end_of_sync + end_of_node_synchronizations: end_of_sync, + network_patches: network_patches }} -> - nb_node_availabilities = Utils.VarInt.from_value(bit_size(node_availabilities)) + nb_node_availabilities = VarInt.from_value(bit_size(node_availabilities)) node_avg_availabilities_bin = node_avg_availabilities |> Enum.map(fn avg -> trunc(avg * 100) end) |> :erlang.list_to_binary() - nb_end_of_sync = Utils.VarInt.from_value(length(end_of_sync)) + nb_end_of_sync = VarInt.from_value(length(end_of_sync)) end_of_sync_bin = :erlang.list_to_binary(end_of_sync) + nb_network_patches_bin = + network_patches + |> length() + |> VarInt.from_value() + + network_patches_bin = :erlang.list_to_binary(network_patches) + <> + node_avg_availabilities_bin::binary, nb_end_of_sync::binary, end_of_sync_bin::binary, + nb_network_patches_bin::binary, network_patches_bin::binary>> end) |> :erlang.list_to_bitstring() @@ -356,57 +438,112 @@ defmodule Archethic.BeaconChain.SummaryAggregate do ...> 211, 178, 208, 5, 184, 33, 193, 167, 91, 160, 131, 129, 117, 45, 242, 3, 132>>) { %SummaryAggregate{ - summary_time: ~U[2022-03-01 00:00:00Z], - replication_attestations: [ - %ReplicationAttestation{ - transaction_summary: %TransactionSummary{ - address: <<0, 0, 120, 123, 229, 13, 144, 130, 230, 18, 17, 45, 244, 92, 226, 107, 11, 104, 226, - 249, 138, 85, 71, 127, 190, 20, 186, 69, 131, 97, 194, 30, 71, 116>>, - type: :transfer, - timestamp: ~U[2022-02-01 10:00:00.204Z], - fee: 10_000_000, - validation_stamp_checksum: <<17, 8, 18, 246, 127, 161, 225, 240, 17, 127, 111, 61, 112, 36, 28, 26, 66, - 167, 176, 119, 17, 169, 60, 36, 119, 204, 81, 109, 144, 66, 249, 219>> - }, - confirmations: [ - { - 0, - <<129, 204, 107, 81, 235, 88, 234, 207, 125, 1, 208, 227, 239, 175, 78, 217, - 100, 172, 67, 228, 131, 42, 177, 200, 54, 225, 34, 241, 35, 226, 108, 138, - 201, 2, 32, 75, 92, 49, 194, 42, 113, 154, 20, 43, 216, 176, 11, 159, 188, - 119, 6, 8, 48, 201, 244, 138, 99, 52, 22, 1, 97, 123, 140, 195>> - } - ] - } - ], - p2p_availabilities: %{ - <<0>> => %{ - node_availabilities: <<1::1, 0::1, 1::1>>, - node_average_availabilities: [0.5, 0.7, 0.8], - end_of_node_synchronizations: [ - <<0, 1, 57, 98, 198, 202, 155, 43, 217, 149, 5, 213, 109, 252, 111, 87, 231, 170, 54, - 211, 178, 208, 5, 184, 33, 193, 167, 91, 160, 131, 129, 117, 45, 242>> + version: 1, + summary_time: ~U[2022-03-01 00:00:00Z], + replication_attestations: [ + %ReplicationAttestation{ + transaction_summary: %TransactionSummary{ + address: <<0, 0, 120, 123, 229, 13, 144, 130, 230, 18, 17, 45, 244, 92, 226, 107, 11, 104, 226, + 249, 138, 85, 71, 127, 190, 20, 186, 69, 131, 97, 194, 30, 71, 116>>, + type: :transfer, + timestamp: ~U[2022-02-01 10:00:00.204Z], + fee: 10_000_000, + validation_stamp_checksum: <<17, 8, 18, 246, 127, 161, 225, 240, 17, 127, 111, 61, 112, 36, 28, 26, 66, + 167, 176, 119, 17, 169, 60, 36, 119, 204, 81, 109, 144, 66, 249, 219>> + }, + confirmations: [ + { + 0, + <<129, 204, 107, 81, 235, 88, 234, 207, 125, 1, 208, 227, 239, 175, 78, 217, + 100, 172, 67, 228, 131, 42, 177, 200, 54, 225, 34, 241, 35, 226, 108, 138, + 201, 2, 32, 75, 92, 49, 194, 42, 113, 154, 20, 43, 216, 176, 11, 159, 188, + 119, 6, 8, 48, 201, 244, 138, 99, 52, 22, 1, 97, 123, 140, 195>> + } ] - } - }, - availability_adding_time: 900 - }, + } + ], + p2p_availabilities: %{ + <<0>> => %{ + node_availabilities: <<1::1, 0::1, 1::1>>, + node_average_availabilities: [0.5, 0.7, 0.8], + end_of_node_synchronizations: [ + <<0, 1, 57, 98, 198, 202, 155, 43, 217, 149, 5, 213, 109, 252, 111, 87, 231, 170, 54, + 211, 178, 208, 5, 184, 33, 193, 167, 91, 160, 131, 129, 117, 45, 242>> + ], + network_patches: [] + } + }, + availability_adding_time: 900 + }, + "" + } + + iex> SummaryAggregate.deserialize(<<2, 98, 29, 98, 0, 1, 1, 1, 1, 0, 0, 120, 123, 229, 13, 144, + ...> 130, 230, 18, 17, 45, 244, 92, 226, 107, 11, 104, 226, 249, 138, 85, 71, 127, 190, 20, 186, 69, + ...> 131, 97, 194, 30, 71, 116, 0, 0, 1, 126, 180, 186, 17, 204, 253, 0, 0, 0, 0, 0, 152, 150, 128, + ...> 1, 0, 17, 8, 18, 246, 127, 161, 225, 240, 17, 127, 111, 61, 112, 36, 28, 26, 66, + ...> 167, 176, 119, 17, 169, 60, 36, 119, 204, 81, 109, 144, 66, 249, 219, 1, 0, 64, + ...> 129, 204, 107, 81, 235, 88, 234, 207, 125, 1, 208, 227, 239, 175, 78, 217, + ...> 100, 172, 67, 228, 131, 42, 177, 200, 54, 225, 34, 241, 35, 226, 108, 138, + ...> 201, 2, 32, 75, 92, 49, 194, 42, 113, 154, 20, 43, 216, 176, 11, 159, 188, + ...> 119, 6, 8, 48, 201, 244, 138, 99, 52, 22, 1, 97, 123, 140, 195, 1, 0, 1, 3, 1::1, 0::1, 1::1, + ...> 50, 70, 80, 1, 1, 0, 1, 57, 98, 198, 202, 155, 43, 217, 149, 5, 213, 109, 252, 111, 87, 231, 170, 54, + ...> 211, 178, 208, 5, 184, 33, 193, 167, 91, 160, 131, 129, 117, 45, 242, 1, 2, "ABC", "DEF", 3, 132>>) + { + %SummaryAggregate{ + version: 2, + summary_time: ~U[2022-03-01 00:00:00Z], + replication_attestations: [ + %ReplicationAttestation{ + transaction_summary: %TransactionSummary{ + address: <<0, 0, 120, 123, 229, 13, 144, 130, 230, 18, 17, 45, 244, 92, 226, 107, 11, 104, 226, + 249, 138, 85, 71, 127, 190, 20, 186, 69, 131, 97, 194, 30, 71, 116>>, + type: :transfer, + timestamp: ~U[2022-02-01 10:00:00.204Z], + fee: 10_000_000, + validation_stamp_checksum: <<17, 8, 18, 246, 127, 161, 225, 240, 17, 127, 111, 61, 112, 36, 28, 26, 66, + 167, 176, 119, 17, 169, 60, 36, 119, 204, 81, 109, 144, 66, 249, 219>> + }, + confirmations: [ + { + 0, + <<129, 204, 107, 81, 235, 88, 234, 207, 125, 1, 208, 227, 239, 175, 78, 217, + 100, 172, 67, 228, 131, 42, 177, 200, 54, 225, 34, 241, 35, 226, 108, 138, + 201, 2, 32, 75, 92, 49, 194, 42, 113, 154, 20, 43, 216, 176, 11, 159, 188, + 119, 6, 8, 48, 201, 244, 138, 99, 52, 22, 1, 97, 123, 140, 195>> + } + ] + } + ], + p2p_availabilities: %{ + <<0>> => %{ + node_availabilities: <<1::1, 0::1, 1::1>>, + node_average_availabilities: [0.5, 0.7, 0.8], + end_of_node_synchronizations: [ + <<0, 1, 57, 98, 198, 202, 155, 43, 217, 149, 5, 213, 109, 252, 111, 87, 231, 170, 54, + 211, 178, 208, 5, 184, 33, 193, 167, 91, 160, 131, 129, 117, 45, 242>> + ], + network_patches: ["ABC", "DEF"] + } + }, + availability_adding_time: 900 + }, "" } """ @spec deserialize(bitstring()) :: {t(), bitstring()} - def deserialize(<<1::8, timestamp::32, rest::bitstring>>) do - {nb_attestations, rest} = Utils.VarInt.get_value(rest) + def deserialize(<>) do + {nb_attestations, rest} = VarInt.get_value(rest) {attestations, <>} = Utils.deserialize_transaction_attestations(rest, nb_attestations, []) {p2p_availabilities, <>} = - deserialize_p2p_availabilities(rest, nb_p2p_availabilities, %{}) + deserialize_p2p_availabilities(version, rest, nb_p2p_availabilities, %{}) { %__MODULE__{ - version: 1, + version: version, summary_time: DateTime.from_unix!(timestamp), replication_attestations: attestations, p2p_availabilities: p2p_availabilities, @@ -416,19 +553,56 @@ defmodule Archethic.BeaconChain.SummaryAggregate do } end - defp deserialize_p2p_availabilities(<<>>, _, acc), do: {acc, <<>>} + defp deserialize_p2p_availabilities(_version, <<>>, _, acc), do: {acc, <<>>} - defp deserialize_p2p_availabilities(rest, nb_p2p_availabilities, acc) + defp deserialize_p2p_availabilities(_version, rest, nb_p2p_availabilities, acc) when map_size(acc) == nb_p2p_availabilities do {acc, rest} end defp deserialize_p2p_availabilities( + 1, + <>, + nb_p2p_availabilities, + acc + ) do + {nb_node_availabilities, rest} = VarInt.get_value(rest) + + <> = rest + + node_avg_availabilities = + node_avg_availabilities_bin + |> :erlang.binary_to_list() + |> Enum.map(fn avg -> avg / 100 end) + + {nb_end_of_sync, rest} = VarInt.get_value(rest) + {end_of_node_sync, rest} = Utils.deserialize_public_key_list(rest, nb_end_of_sync, []) + + deserialize_p2p_availabilities( + 1, + rest, + nb_p2p_availabilities, + Map.put( + acc, + subset, + %{ + node_availabilities: node_availabilities, + node_average_availabilities: node_avg_availabilities, + end_of_node_synchronizations: end_of_node_sync, + network_patches: [] + } + ) + ) + end + + defp deserialize_p2p_availabilities( + version, <>, nb_p2p_availabilities, acc ) do - {nb_node_availabilities, rest} = Utils.VarInt.get_value(rest) + {nb_node_availabilities, rest} = VarInt.get_value(rest) <> = rest @@ -438,10 +612,20 @@ defmodule Archethic.BeaconChain.SummaryAggregate do |> :erlang.binary_to_list() |> Enum.map(fn avg -> avg / 100 end) - {nb_end_of_sync, rest} = Utils.VarInt.get_value(rest) + {nb_end_of_sync, rest} = VarInt.get_value(rest) + {end_of_node_sync, rest} = Utils.deserialize_public_key_list(rest, nb_end_of_sync, []) + {nb_patches, rest} = VarInt.get_value(rest) + <> = rest + + network_patches = + for <> do + patch + end + deserialize_p2p_availabilities( + version, rest, nb_p2p_availabilities, Map.put( @@ -450,7 +634,8 @@ defmodule Archethic.BeaconChain.SummaryAggregate do %{ node_availabilities: node_availabilities, node_average_availabilities: node_avg_availabilities, - end_of_node_synchronizations: end_of_node_sync + end_of_node_synchronizations: end_of_node_sync, + network_patches: network_patches } ) ) diff --git a/lib/archethic/db.ex b/lib/archethic/db.ex index bb6fe467e..9f9df6eaa 100644 --- a/lib/archethic/db.ex +++ b/lib/archethic/db.ex @@ -68,11 +68,19 @@ defmodule Archethic.DB do @callback transaction_exists?(binary(), storage_type()) :: boolean() - @callback register_p2p_summary(list({Crypto.key(), boolean(), float(), DateTime.t()})) :: :ok + @callback register_p2p_summary( + list( + {node_public_key :: Crypto.key(), available? :: boolean(), + average_availability :: float(), availability_update :: DateTime.t(), + network_patch :: binary()} + ) + ) :: + :ok @callback get_last_p2p_summaries() :: %{ (node_public_key :: Crypto.key()) => - {available? :: boolean(), average_availability :: float()} + {available? :: boolean(), average_availability :: float(), + network_patch :: binary() | nil} } @callback get_bootstrap_info(key :: String.t()) :: String.t() | nil diff --git a/lib/archethic/db/embedded_impl.ex b/lib/archethic/db/embedded_impl.ex index b9af3ece7..561e78366 100644 --- a/lib/archethic/db/embedded_impl.ex +++ b/lib/archethic/db/embedded_impl.ex @@ -344,7 +344,8 @@ defmodule Archethic.DB.EmbeddedImpl do @spec get_last_p2p_summaries() :: %{ (node_public_key :: Crypto.key()) => { available? :: boolean(), - average_availability :: float() + average_availability :: float(), + network_patch :: binary() | nil } } defdelegate get_last_p2p_summaries, to: P2PView, as: :get_views diff --git a/lib/archethic/db/embedded_impl/p2p_view.ex b/lib/archethic/db/embedded_impl/p2p_view.ex index af4cb3e88..23a4a2f5b 100644 --- a/lib/archethic/db/embedded_impl/p2p_view.ex +++ b/lib/archethic/db/embedded_impl/p2p_view.ex @@ -26,7 +26,8 @@ defmodule Archethic.DB.EmbeddedImpl.P2PView do @spec get_views :: %{ (node_public_key :: Crypto.key()) => { available? :: boolean(), - average_availability :: float() + average_availability :: float(), + network_patch :: String.t() | nil } } def get_views do @@ -69,14 +70,23 @@ defmodule Archethic.DB.EmbeddedImpl.P2PView do defp serialize([], acc), do: acc defp serialize([view | rest], acc) do - {node_key, available?, avg_availability, availability_update} = view + {node_key, available?, avg_availability, availability_update, network_patch} = view available_bit = if available?, do: 1, else: 0 avg_availability_int = (avg_availability * 100) |> trunc() + network_patch_bin = + case network_patch do + nil -> + <<0::8>> + + _ -> + <<1::8, network_patch::binary>> + end + acc = <> + DateTime.to_unix(availability_update)::32, network_patch_bin::binary>> serialize(rest, acc) end @@ -88,10 +98,23 @@ defmodule Archethic.DB.EmbeddedImpl.P2PView do <>} = Utils.deserialize_public_key(data) + {network_patch, rest} = + case rest do + <<1::8, network_patch::binary-size(3), rest::bitstring>> -> + {network_patch, rest} + + <<0::8, rest::bitstring>> -> + {nil, rest} + + _ -> + {nil, rest} + end + available? = if available_bit == 1, do: true, else: false view = - {node_key, available?, avg_availability_int / 100, DateTime.from_unix!(availability_update)} + {node_key, available?, avg_availability_int / 100, DateTime.from_unix!(availability_update), + network_patch} deserialize(rest, [view | acc]) end diff --git a/lib/archethic/oracle_chain/services/uco_price.ex b/lib/archethic/oracle_chain/services/uco_price.ex index 888f51445..cd3da0e6a 100644 --- a/lib/archethic/oracle_chain/services/uco_price.ex +++ b/lib/archethic/oracle_chain/services/uco_price.ex @@ -90,7 +90,7 @@ defmodule Archethic.OracleChain.Services.UCOPrice do deviation = [price_prior, price_now] - |> standard_deviation() + |> Utils.standard_deviation() |> Float.round(3) if deviation < deviation_threshold do @@ -104,19 +104,6 @@ defmodule Archethic.OracleChain.Services.UCOPrice do end end - defp standard_deviation(prices) do - prices_mean = mean(prices) - variance = prices |> Enum.map(fn x -> (prices_mean - x) * (prices_mean - x) end) |> mean() - :math.sqrt(variance) - end - - defp mean(prices, t \\ 0, l \\ 0) - defp mean([], t, l), do: t / l - - defp mean([x | xs], t, l) do - mean(xs, t + x, l + 1) - end - @impl Impl @spec parse_data(map()) :: {:ok, map()} | :error def parse_data(service_data) when is_map(service_data) do diff --git a/lib/archethic/p2p.ex b/lib/archethic/p2p.ex index 92490507b..311b459ea 100644 --- a/lib/archethic/p2p.ex +++ b/lib/archethic/p2p.ex @@ -134,12 +134,16 @@ defmodule Archethic.P2P do defdelegate list_authorized_public_keys, to: MemTable @doc """ - Determine if the node public key is authorized + Determine if the node public key is authorized for the given datetime (default to now) or before if needed """ - @spec authorized_node?(Crypto.key()) :: boolean() - def authorized_node?(node_public_key \\ Crypto.first_node_public_key()) - when is_binary(node_public_key) do - Utils.key_in_node_list?(authorized_nodes(), node_public_key) + @spec authorized_node?(Crypto.key(), date :: DateTime.t(), before? :: boolean()) :: boolean() + def authorized_node?( + node_public_key \\ Crypto.first_node_public_key(), + datetime = %DateTime{} \\ DateTime.utc_now(), + before? \\ false + ) + when is_binary(node_public_key) and is_boolean(before?) do + Utils.key_in_node_list?(authorized_nodes(datetime, before?), node_public_key) end @doc """ @@ -152,11 +156,19 @@ defmodule Archethic.P2P do end @doc """ - Determine if the node public key is authorized and available + Determine if the node public key is authorized and available for the given datetime (default to now) or before if needed """ - @spec authorized_and_available_node?(Crypto.key()) :: boolean() - def authorized_and_available_node?(node_public_key \\ Crypto.first_node_public_key()) do - Utils.key_in_node_list?(authorized_and_available_nodes(), node_public_key) + @spec authorized_and_available_node?( + Crypto.key(), + datetime :: DateTime.t(), + before? :: boolean() + ) :: boolean() + def authorized_and_available_node?( + node_public_key \\ Crypto.first_node_public_key(), + datetime = %DateTime{} \\ DateTime.utc_now(), + before? \\ false + ) do + Utils.key_in_node_list?(authorized_and_available_nodes(datetime, before?), node_public_key) end @doc """ @@ -756,4 +768,10 @@ defmodule Archethic.P2P do conflict_resolver.(distinct_elems) end end + + @doc """ + Update the node's network patch + """ + @spec update_node_network_patch(Crypto.key(), String.t()) :: :ok + defdelegate update_node_network_patch(node_public_key, network_patch), to: MemTable end diff --git a/lib/archethic/p2p/mem_table_loader.ex b/lib/archethic/p2p/mem_table_loader.ex index 65200c5ca..55a53dd47 100644 --- a/lib/archethic/p2p/mem_table_loader.ex +++ b/lib/archethic/p2p/mem_table_loader.ex @@ -78,17 +78,17 @@ defmodule Archethic.P2P.MemTableLoader do is_same_slot? = DateTime.compare(DateTime.utc_now(), next_repair_time) == :lt p2p_summaries = DB.get_last_p2p_summaries() - - previously_available = Enum.filter(p2p_summaries, &match?({_, true, _, _}, &1)) + previously_available = Enum.filter(p2p_summaries, &match?({_, true, _, _, _}, &1)) node_key = Crypto.first_node_public_key() case previously_available do # Ensure the only single node is globally available after a delayed bootstrap - [{^node_key, _, avg_availability, availability_update}] -> + [{^node_key, _, avg_availability, availability_update, network_patch}] -> P2P.set_node_globally_synced(node_key) P2P.set_node_globally_available(node_key, availability_update) P2P.set_node_average_availability(node_key, avg_availability) + P2P.update_node_network_patch(node_key, network_patch) [] -> P2P.set_node_globally_synced(node_key) @@ -206,7 +206,7 @@ defmodule Archethic.P2P.MemTableLoader do defp first_node_change?(_, _), do: false defp load_p2p_summary( - {node_public_key, available?, avg_availability, availability_update}, + {node_public_key, available?, avg_availability, availability_update, network_patch}, is_same_slot? ) do if available? do @@ -218,5 +218,9 @@ defmodule Archethic.P2P.MemTableLoader do end MemTable.update_node_average_availability(node_public_key, avg_availability) + + if network_patch do + MemTable.update_node_network_patch(node_public_key, network_patch) + end end end diff --git a/lib/archethic/p2p/message.ex b/lib/archethic/p2p/message.ex index 82a2b8950..68bc7c6a8 100644 --- a/lib/archethic/p2p/message.ex +++ b/lib/archethic/p2p/message.ex @@ -75,7 +75,9 @@ defmodule Archethic.P2P.Message do TransactionSummaryMessage, AcknowledgeStorage, ReplicationAttestationMessage, - GetTransactionSummary + GetTransactionSummary, + GetNetworkStats, + NetworkStats } require Logger @@ -125,6 +127,7 @@ defmodule Archethic.P2P.Message do | NotifyReplicationValidation.t() | AcknowledgeStorage.t() | GetTransactionSummary.t() + | GetNetworkStats.t() @type response :: Ok.t() @@ -150,6 +153,7 @@ defmodule Archethic.P2P.Message do | ReplicationError.t() | SummaryAggregate.t() | AddressList.t() + | NetworkStats.t() @floor_upload_speed Application.compile_env!(:archethic, [__MODULE__, :floor_upload_speed]) @content_max_size Application.compile_env!(:archethic, :transaction_data_content_max_size) diff --git a/lib/archethic/p2p/message/get_network_stats.ex b/lib/archethic/p2p/message/get_network_stats.ex new file mode 100644 index 000000000..62c9da113 --- /dev/null +++ b/lib/archethic/p2p/message/get_network_stats.ex @@ -0,0 +1,79 @@ +defmodule Archethic.P2P.Message.GetNetworkStats do + @moduledoc """ + Represents a message to get the network stats from the beacon summary cache + """ + + @enforce_keys :subsets + defstruct subsets: [] + + alias Archethic.BeaconChain + alias Archethic.Crypto + alias Archethic.P2P.Message.NetworkStats + + @type t :: %__MODULE__{ + subsets: list(binary()) + } + + @doc """ + Serialize the get network stats message into binary + + ## Examples + + iex> %GetNetworkStats{subsets: [<<0>>, <<255>>]} |> GetNetworkStats.serialize() + << + # Length of subsets + 0, 2, + # Subset + 0, 255 + >> + """ + def serialize(%__MODULE__{subsets: subsets}) do + <> + end + + @doc """ + Deserialize the binary into the get network stats message + + ## Examples + + iex> <<0, 2, 0, 255>> |> GetNetworkStats.deserialize() + { + %GetNetworkStats{subsets: [<<0>>, <<255>>]}, + "" + } + """ + def deserialize(<>) do + subsets = + subsets_binary + |> :erlang.binary_to_list() + |> Enum.map(&<<&1>>) + + { + %__MODULE__{subsets: subsets}, + rest + } + end + + @doc """ + Process the message to get the network stats from the summary cache + """ + @spec process(t(), Crypto.key()) :: NetworkStats.t() + def process(%__MODULE__{subsets: subsets}, _node_public_key) do + stats = + subsets + |> Task.async_stream(fn subset -> + stats = BeaconChain.get_network_stats(subset) + {subset, stats} + end) + |> Stream.map(fn {:ok, res} -> res end) + |> Enum.reduce(%{}, fn + {subset, stats}, acc when map_size(stats) > 0 -> + Map.put(acc, subset, stats) + + _, acc -> + acc + end) + + %NetworkStats{stats: stats} + end +end diff --git a/lib/archethic/p2p/message/message_id.ex b/lib/archethic/p2p/message/message_id.ex index 1d941512c..de16aad1d 100644 --- a/lib/archethic/p2p/message/message_id.ex +++ b/lib/archethic/p2p/message/message_id.ex @@ -65,7 +65,9 @@ defmodule Archethic.P2P.MessageId do ReplicatePendingTransactionChain, NotifyReplicationValidation, TransactionSummaryMessage, - ReplicationAttestationMessage + ReplicationAttestationMessage, + GetNetworkStats, + NetworkStats } alias Archethic.TransactionChain.{ @@ -118,8 +120,10 @@ defmodule Archethic.P2P.MessageId do ValidateTransaction => 36, ReplicatePendingTransactionChain => 37, NotifyReplicationValidation => 38, + GetNetworkStats => 39, # Responses + NetworkStats => 227, FirstTransactionAddress => 228, AddressList => 229, ShardRepair => 230, diff --git a/lib/archethic/p2p/message/network_stats.ex b/lib/archethic/p2p/message/network_stats.ex new file mode 100644 index 000000000..a1f2ba65f --- /dev/null +++ b/lib/archethic/p2p/message/network_stats.ex @@ -0,0 +1,137 @@ +defmodule Archethic.P2P.Message.NetworkStats do + @moduledoc """ + Represents network stats from the aggregated beacon chain summary's cache + """ + + defstruct stats: %{} + + alias Archethic.BeaconChain.Slot + alias Archethic.Crypto + alias Archethic.Utils + alias Archethic.Utils.VarInt + + @type t :: %__MODULE__{ + stats: %{ + (subset :: binary) => %{ + Crypto.key() => Slot.net_stats() + } + } + } + + @doc """ + Serializes the network stats into binary + + ## Examples + + iex> %NetworkStats{stats: %{ <<0>> => %{ + ...> <<0, 0, 75, 23, 134, 64, 221, 117, 107, 77, 233, 123, 201, 244, 18, 151, 8, 255, + ...> 53, 137, 251, 197, 67, 25, 38, 95, 2, 62, 216, 131, 112, 116, 238, 180>> => [%{latency: 100}, %{latency: 110}, %{latency: 80}] + ...> }}} |> NetworkStats.serialize() + << + # Nb subsets + 0, 1, + # Subset + 0, + # Nb node stats + 1, 1, + # Node public key + 0, 0, 75, 23, 134, 64, 221, 117, 107, 77, 233, 123, 201, 244, 18, 151, 8, 255, + 53, 137, 251, 197, 67, 25, 38, 95, 2, 62, 216, 131, 112, 116, 238, 180, + # Nb latencies + 1, 3, + # Latencies + 1, 100, + 1, 110, + 1, 80 + >> + """ + @spec serialize(t()) :: bitstring() + def serialize(%__MODULE__{stats: stats}) do + nb_subsets = map_size(stats) + + stats_binary = + Enum.map(stats, fn {subset, stats} -> + serialize_subset_stats(subset, stats) + end) + |> :erlang.list_to_binary() + + <> + end + + defp serialize_subset_stats(subset, stats) do + stats_bin = + stats + |> Enum.map(fn {node_public_key, latencies} -> + nb_latencies_bin = VarInt.from_value(length(latencies)) + + latency_bin = + latencies + |> Enum.map(fn %{latency: latency} -> VarInt.from_value(latency) end) + |> :erlang.list_to_binary() + + <> + end) + |> :erlang.list_to_binary() + + nb_stats = map_size(stats) + nb_stats_bin = VarInt.from_value(nb_stats) + <> + end + + @doc """ + Deserialize the binary into a network stats message + + ## Examples + + iex> <<0, 1, 0, 1, 1, 0, 0, 75, 23, 134, 64, 221, 117, 107, 77, 233, 123, 201, 244, 18, 151, + ...> 8, 255, 53, 137, 251, 197, 67, 25, 38, 95, 2, 62, 216, 131, 112, 116, 238, + ...> 180, 1, 3, 1, 100, 1, 110, 1, 80>> |> NetworkStats.deserialize() + { + %NetworkStats{ + stats: %{ + <<0>> => %{ + <<0, 0, 75, 23, 134, 64, 221, 117, 107, 77, 233, 123, 201, 244, 18, 151, 8, 255, + 53, 137, 251, 197, 67, 25, 38, 95, 2, 62, 216, 131, 112, 116, 238, 180>> => [%{latency: 100}, %{latency: 110}, %{latency: 80}] + } + } + }, + "" + } + """ + @spec deserialize(bitstring()) :: {t(), bitstring()} + def deserialize(<>) do + {stats, rest} = get_subsets_stats(rest, nb_subsets, %{}) + + { + %__MODULE__{stats: stats}, + rest + } + end + + defp get_subsets_stats(<<>>, _nb_subsets, acc), do: {acc, <<>>} + defp get_subsets_stats(rest, nb_subsets, acc) when map_size(acc) == nb_subsets, do: {acc, rest} + + defp get_subsets_stats(<>, nb_subsets, acc) do + {nb_stats, rest} = VarInt.get_value(data) + {stats, rest} = get_stats(rest, nb_stats, %{}) + get_subsets_stats(rest, nb_subsets, Map.put(acc, subset, stats)) + end + + defp get_stats(<<>>, _nb_stats, acc), do: {acc, <<>>} + defp get_stats(rest, nb_stats, acc) when map_size(acc) == nb_stats, do: {acc, rest} + + defp get_stats(data, nb_stats, acc) do + {node_public_key, rest} = Utils.deserialize_public_key(data) + {nb_latencies, rest} = VarInt.get_value(rest) + {latencies, rest} = get_latencies(rest, nb_latencies, []) + get_stats(rest, nb_stats, Map.put(acc, node_public_key, latencies)) + end + + defp get_latencies(rest, nb, acc) when length(acc) == nb, do: {Enum.reverse(acc), rest} + defp get_latencies(<<>>, _, acc), do: {Enum.reverse(acc), <<>>} + + defp get_latencies(data, nb, acc) do + {latency, rest} = VarInt.get_value(data) + get_latencies(rest, nb, [%{latency: latency} | acc]) + end +end diff --git a/lib/archethic/p2p/message/new_beacon_slot.ex b/lib/archethic/p2p/message/new_beacon_slot.ex index feb5e129d..7e2401a3c 100644 --- a/lib/archethic/p2p/message/new_beacon_slot.ex +++ b/lib/archethic/p2p/message/new_beacon_slot.ex @@ -22,7 +22,10 @@ defmodule Archethic.P2P.Message.NewBeaconSlot do } @spec process(__MODULE__.t(), Crypto.key()) :: Ok.t() | Error.t() - def process(%__MODULE__{slot: slot = %Slot{subset: subset, slot_time: slot_time}}, _) do + def process( + %__MODULE__{slot: slot = %Slot{subset: subset, slot_time: slot_time}}, + node_public_key + ) do summary_time = BeaconChain.next_summary_date(slot_time) node_list = P2P.authorized_and_available_nodes(summary_time, true) @@ -36,7 +39,7 @@ defmodule Archethic.P2P.Message.NewBeaconSlot do # Load BeaconChain's slot only for the summary nodes with true <- Utils.key_in_node_list?(beacon_summary_nodes, Crypto.first_node_public_key()), - :ok <- BeaconChain.load_slot(slot) do + :ok <- BeaconChain.load_slot(slot, node_public_key) do %Ok{} else false -> diff --git a/lib/archethic/self_repair/sync.ex b/lib/archethic/self_repair/sync.ex index 7ab7d682a..99ed8e6df 100644 --- a/lib/archethic/self_repair/sync.ex +++ b/lib/archethic/self_repair/sync.ex @@ -298,7 +298,8 @@ defmodule Archethic.SelfRepair.Sync do %{ node_availabilities: node_availabilities, node_average_availabilities: node_average_availabilities, - end_of_node_synchronizations: end_of_node_synchronizations + end_of_node_synchronizations: end_of_node_synchronizations, + network_patches: network_patches }}, acc -> sync_node(end_of_node_synchronizations) @@ -308,6 +309,7 @@ defmodule Archethic.SelfRepair.Sync do summary_time, node_availabilities, node_average_availabilities, + network_patches, acc ) end) @@ -360,6 +362,7 @@ defmodule Archethic.SelfRepair.Sync do time, node_availabilities, node_average_availabilities, + network_patches, acc ) do node_list = Enum.filter(P2P.list_nodes(), &(DateTime.diff(&1.enrollment_date, time) <= 0)) @@ -372,18 +375,24 @@ defmodule Archethic.SelfRepair.Sync do |> Enum.reduce(acc, fn {available_bit, index}, acc -> node = Enum.at(subset_node_list, index) avg_availability = Enum.at(node_average_availabilities, index) - - if available_bit == 1 and node.synced? do - Map.put(acc, node, %{available?: true, average_availability: avg_availability}) - else - Map.put(acc, node, %{available?: false, average_availability: avg_availability}) - end + network_patch = Enum.at(network_patches, index) + available? = available_bit == 1 and node.synced? + + Map.put(acc, node, %{ + available?: available?, + average_availability: avg_availability, + network_patch: network_patch + }) end) end defp update_availabilities( {%Node{first_public_key: node_key}, - %{available?: available?, average_availability: avg_availability}}, + %{ + available?: available?, + average_availability: avg_availability, + network_patch: network_patch + }}, availability_update ) do if available? do @@ -395,9 +404,13 @@ defmodule Archethic.SelfRepair.Sync do P2P.set_node_average_availability(node_key, avg_availability) + if network_patch do + P2P.update_node_network_patch(node_key, network_patch) + end + %Node{availability_update: availability_update} = P2P.get_node_info!(node_key) - {node_key, available?, avg_availability, availability_update} + {node_key, available?, avg_availability, availability_update, network_patch} end defp update_statistics(date, []) do diff --git a/lib/archethic/utils.ex b/lib/archethic/utils.ex index 02ceeee85..665f656bd 100644 --- a/lib/archethic/utils.ex +++ b/lib/archethic/utils.ex @@ -996,6 +996,67 @@ defmodule Archethic.Utils do |> Base.encode16() end + @doc """ + Return the standard deviation from a list + + ### Examples + + iex> Utils.standard_deviation([1, 2, 3, 4]) + 1.118034 + """ + @spec standard_deviation(list()) :: number() + def standard_deviation(list) do + list_mean = mean(list) + + list + |> Enum.map(fn x -> (list_mean - x) * (list_mean - x) end) + |> mean() + |> :math.sqrt() + |> Float.round(6) + end + + @doc """ + Return the mean from a list + + ### Examples + + iex> Utils.mean([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) + 5.5 + """ + @spec mean(list()) :: number() + def mean(list, t \\ 0, l \\ 0) + def mean([], t, l), do: t / l + + def mean([x | xs], t, l) do + mean(xs, t + x, l + 1) + end + + @doc """ + Chunk a list into N sub lists + + ### Examples + + iex> Utils.chunk_list_in([1, 2, 3, 4, 5, 6], 3) + [ [1, 2], [3, 4], [5, 6] ] + + iex> Utils.chunk_list_in([1, 2, 3, 4, 5, 6, 7], 3) + [ [1, 2], [3, 4], [5, 6, 7] ] + """ + @spec chunk_list_in(list(), pos_integer()) :: list(list()) + def chunk_list_in(list, parts) when is_list(list) and is_number(parts) and parts > 0 do + list + |> do_chunk(parts, []) + |> Enum.reverse() + end + + defp do_chunk(_, 0, chunks), do: chunks + + defp do_chunk(to_chunk, parts, chunks) do + chunk_length = to_chunk |> length() |> div(parts) + {chunk, rest} = Enum.split(to_chunk, chunk_length) + do_chunk(rest, parts - 1, [chunk | chunks]) + end + @spec await_confirmation(tx_address :: binary(), list(Node.t())) :: :ok | {:error, :network_issue} def await_confirmation(tx_address, nodes) do diff --git a/test/archethic/beacon_chain/network_coordinates_test.exs b/test/archethic/beacon_chain/network_coordinates_test.exs index 378bea551..f9dca62b1 100644 --- a/test/archethic/beacon_chain/network_coordinates_test.exs +++ b/test/archethic/beacon_chain/network_coordinates_test.exs @@ -1,6 +1,81 @@ defmodule Archethic.BeaconChain.NetworkCoordinatesTest do - use ExUnit.Case + use ArchethicCase alias Archethic.BeaconChain.NetworkCoordinates + + alias Archethic.P2P + alias Archethic.P2P.Message.GetNetworkStats + alias Archethic.P2P.Message.NetworkStats + alias Archethic.P2P.Node + doctest NetworkCoordinates + + import Mox + + describe "fetch_network_stats/1" do + test "should retrieve the stats for a given summary time" do + beacon_nodes = + Enum.map(0..2, fn i -> + %Node{ + first_public_key: <<0::8, 0::8, 1::8, "key_b#{i}">>, + last_public_key: <<0::8, 0::8, 1::8, "key_b#{i}">>, + ip: {127, 0, 0, 1}, + port: 3000 + i, + authorized?: true, + authorization_date: DateTime.utc_now(), + available?: true, + geo_patch: "BBB" + } + end) + + sampled_nodes = + Enum.map(0..2, fn i -> + %Node{ + first_public_key: <<0::8, 0::8, 0::8, "key_s#{i}">>, + last_public_key: <<0::8, 0::8, 0::8, "key_s#{i}">>, + ip: {127, 0, 0, 10 + i}, + port: 3010 + i + } + end) + + Enum.each(beacon_nodes, &P2P.add_and_connect_node/1) + Enum.each(sampled_nodes, &P2P.add_and_connect_node/1) + + MockClient + |> stub(:send_message, fn + _, %GetNetworkStats{subsets: _}, _ -> + {:ok, + %NetworkStats{ + stats: %{ + <<0>> => %{ + <<0::8, 0::8, 1::8, "key_b0">> => [ + %{latency: 100}, + %{latency: 110}, + %{latency: 90} + ], + <<0::8, 0::8, 1::8, "key_b1">> => [ + %{latency: 100}, + %{latency: 105}, + %{latency: 90} + ], + <<0::8, 0::8, 1::8, "key_b2">> => [ + %{latency: 90}, + %{latency: 105}, + %{latency: 90} + ] + } + } + }} + end) + + assert Nx.tensor([ + [0, 0, 0, 100, 100, 90], + [0, 0, 0, 110, 105, 105], + [0, 0, 0, 90, 90, 90], + [100, 110, 90, 0, 0, 0], + [100, 105, 90, 0, 0, 0], + [90, 105, 90, 0, 0, 0] + ]) == NetworkCoordinates.fetch_network_stats(DateTime.utc_now()) + end + end end diff --git a/test/archethic/beacon_chain/subset/summary_cache_test.exs b/test/archethic/beacon_chain/subset/summary_cache_test.exs index 8d9d63e3f..8223726f5 100644 --- a/test/archethic/beacon_chain/subset/summary_cache_test.exs +++ b/test/archethic/beacon_chain/subset/summary_cache_test.exs @@ -5,19 +5,58 @@ defmodule Archethic.BeaconChain.Subset.SummaryCacheTest do alias Archethic.BeaconChain.Subset.SummaryCache alias Archethic.BeaconChain.Slot + alias Archethic.BeaconChain.SlotTimer alias Archethic.BeaconChain.Slot.EndOfNodeSync - alias Archethic.BeaconChain.SummaryTimer + alias Archethic.Crypto + alias Archethic.Utils alias Archethic.TransactionChain.TransactionSummary - test "summary cache should backup a slot, recover it on restart and delete backup on pop_slots" do + test "should clean the previous summary slots after the new epoch event" do + {:ok, _pid} = SummaryTimer.start_link([interval: "0 0 * * * *"], []) + {:ok, _pid} = SlotTimer.start_link([interval: "10 * * * *"], []) + {:ok, pid} = SummaryCache.start_link() + File.mkdir_p!(Utils.mut_dir()) + + subset = <<0>> + + slot_pre_summary = %Slot{ + slot_time: ~U[2023-01-01 07:59:50Z], + subset: subset + } + + slot_post_summary = %Slot{ + slot_time: ~U[2023-01-01 08:00:20Z], + subset: subset + } + + previous_summary_time = SummaryTimer.previous_summary(~U[2023-01-01 08:01:00Z]) + + first_slot_time = SlotTimer.next_slot(previous_summary_time) + SummaryCache.add_slot(subset, slot_pre_summary, "node_key") + SummaryCache.add_slot(subset, slot_post_summary, "node_key") + + send(pid, {:current_epoch_of_slot_timer, first_slot_time}) + Process.sleep(100) + + assert [{^slot_post_summary, "node_key"}] = + subset + |> SummaryCache.stream_current_slots() + |> Enum.to_list() + + recover_path = Utils.mut_dir("slot_backup-#{DateTime.to_unix(previous_summary_time)}") + assert !File.exists?(recover_path) + end + + test "summary cache should backup a slot, recover it on restart" do {:ok, _pid} = SummaryTimer.start_link([interval: "0 0 * * * * *"], []) File.mkdir_p!(Utils.mut_dir()) - path = Utils.mut_dir("slot_backup") + next_summary_time = SummaryTimer.next_summary(DateTime.utc_now()) + path = Utils.mut_dir("slot_backup-#{DateTime.to_unix(next_summary_time)}") {:ok, pid} = SummaryCache.start_link() @@ -62,9 +101,10 @@ defmodule Archethic.BeaconChain.Subset.SummaryCacheTest do } } - :ok = SummaryCache.add_slot(<<0>>, slot) + node_key = Crypto.first_node_public_key() + :ok = SummaryCache.add_slot(<<0>>, slot, node_key) - assert [^slot] = :ets.lookup_element(:archethic_summary_cache, <<0>>, 2) + assert [{^slot, ^node_key}] = :ets.lookup_element(:archethic_summary_cache, <<0>>, 2) assert File.exists?(path) GenServer.stop(pid) @@ -72,9 +112,7 @@ defmodule Archethic.BeaconChain.Subset.SummaryCacheTest do {:ok, _} = SummaryCache.start_link() - slots = SummaryCache.pop_slots(<<0>>) - assert !File.exists?(path) - - assert [^slot] = slots + slots = SummaryCache.stream_current_slots(<<0>>) |> Enum.to_list() + assert [{^slot, ^node_key}] = slots end end diff --git a/test/archethic/beacon_chain/subset_test.exs b/test/archethic/beacon_chain/subset_test.exs index d5b52dbbd..be9458f3a 100644 --- a/test/archethic/beacon_chain/subset_test.exs +++ b/test/archethic/beacon_chain/subset_test.exs @@ -9,6 +9,7 @@ defmodule Archethic.BeaconChain.SubsetTest do Summary, SummaryTimer, Subset.SummaryCache, + Subset.StatsCollector, Subset } @@ -19,6 +20,8 @@ defmodule Archethic.BeaconChain.SubsetTest do alias Archethic.P2P alias Archethic.P2P.Message.BeaconUpdate alias Archethic.P2P.Message.NewBeaconSlot + alias Archethic.P2P.Message.GetNetworkStats + alias Archethic.P2P.Message.NetworkStats alias Archethic.P2P.Message.Ok alias Archethic.P2P.Message.Ping alias Archethic.P2P.Node @@ -40,6 +43,8 @@ defmodule Archethic.BeaconChain.SubsetTest do authorization_date: DateTime.utc_now() |> DateTime.add(-1) }) + StatsCollector.start_link() + {:ok, subset: <<0>>} end @@ -325,14 +330,26 @@ defmodule Archethic.BeaconChain.SubsetTest do {:ok, %Ok{}} _, %NewBeaconSlot{slot: slot = %Slot{subset: subset}}, _ -> - SummaryCache.add_slot(subset, slot) + SummaryCache.add_slot(subset, slot, Crypto.first_node_public_key()) {:ok, %Ok{}} + + _, %GetNetworkStats{subsets: _}, _ -> + {:ok, + %NetworkStats{ + stats: %{ + <> => %{ + Crypto.first_node_public_key() => [%{latency: 90}, %{latency: 100}], + <<0::8, 0::8, "key_beacon_node2">> => [%{latency: 90}, %{latency: 100}] + } + } + }} end) MockClient |> stub(:get_availability_timer, fn _, _ -> 0 end) - summary_interval = "*/3 * * * *" + summary_interval = "*/5 * * * *" + start_supervised!({SummaryTimer, interval: summary_interval}) start_supervised!({SlotTimer, interval: "*/1 * * * *"}) start_supervised!(SummaryCache) @@ -347,10 +364,8 @@ defmodule Archethic.BeaconChain.SubsetTest do P2P.add_and_connect_node(%Node{ ip: {127, 0, 0, 1}, port: 3000, - first_public_key: - <<0::8, 0::8, subset::binary-size(1), :crypto.strong_rand_bytes(31)::binary>>, - last_public_key: - <<0::8, 0::8, subset::binary-size(1), :crypto.strong_rand_bytes(31)::binary>>, + first_public_key: Crypto.first_node_public_key(), + last_public_key: Crypto.first_node_public_key(), geo_patch: "AAA", network_patch: "AAA", available?: true, @@ -359,6 +374,36 @@ defmodule Archethic.BeaconChain.SubsetTest do enrollment_date: ~U[2020-09-01 00:00:00Z] }) + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 1}, + port: 3001, + first_public_key: <<0::8, 0::8, "key_beacon_node2">>, + last_public_key: <<0::8, 0::8, "key_beacon_node2">>, + geo_patch: "AAA", + network_patch: "AAA", + available?: true, + authorized?: true, + authorization_date: ~U[2020-09-01 00:00:00Z], + enrollment_date: ~U[2020-09-01 00:00:00Z] + }) + + # Sampled nodes + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 1}, + port: 3005, + first_public_key: <<0::8, 0::8, subset::binary-size(1), "key_sample_node1">>, + last_public_key: <<0::8, 0::8, subset::binary-size(1), "key_sample_node1">>, + enrollment_date: ~U[2020-09-01 00:00:00Z] + }) + + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 1}, + port: 3006, + first_public_key: <<0::8, 0::8, subset::binary-size(1), "key_sample_node2">>, + last_public_key: <<0::8, 0::8, subset::binary-size(1), "key_sample_node2">>, + enrollment_date: ~U[2020-09-01 00:00:00Z] + }) + tx_summary = %TransactionSummary{ address: tx_address, timestamp: tx_time, @@ -387,7 +432,8 @@ defmodule Archethic.BeaconChain.SubsetTest do %ReplicationAttestation{ transaction_summary: ^tx_summary } - ] + ], + network_patches: [_ | _] } -> send(me, :beacon_transaction_summary_stored) end) @@ -400,7 +446,7 @@ defmodule Archethic.BeaconChain.SubsetTest do |> DateTime.truncate(:millisecond) send(pid, {:create_slot, now}) - assert_receive :beacon_transaction_summary_stored + assert_receive :beacon_transaction_summary_stored, 2_000 end end diff --git a/test/archethic/beacon_chain/summary_aggregate_test.exs b/test/archethic/beacon_chain/summary_aggregate_test.exs index 194537bdf..01919f049 100644 --- a/test/archethic/beacon_chain/summary_aggregate_test.exs +++ b/test/archethic/beacon_chain/summary_aggregate_test.exs @@ -1,9 +1,94 @@ defmodule Archethic.BeaconChain.SummaryAggregateTest do - use ExUnit.Case + use ArchethicCase alias Archethic.BeaconChain.SummaryAggregate + alias Archethic.P2P + alias Archethic.P2P.Node alias Archethic.BeaconChain.ReplicationAttestation alias Archethic.TransactionChain.TransactionSummary doctest SummaryAggregate + + describe "aggregate/1" do + test "should aggregate multiple network patches into a single one" do + P2P.add_and_connect_node(%Node{ + first_public_key: <<0::8, 0::8, 0::8, :crypto.strong_rand_bytes(31)::binary>>, + last_public_key: <<0::8, 0::8, 0::8, :crypto.strong_rand_bytes(31)::binary>>, + ip: {127, 0, 0, 1}, + port: 3000 + }) + + P2P.add_and_connect_node(%Node{ + first_public_key: <<0::8, 0::8, 0::8, :crypto.strong_rand_bytes(31)::binary>>, + last_public_key: <<0::8, 0::8, 0::8, :crypto.strong_rand_bytes(31)::binary>>, + ip: {127, 0, 0, 1}, + port: 3001 + }) + + assert %SummaryAggregate{ + p2p_availabilities: %{ + <<0>> => %{ + node_availabilities: <<>>, + node_average_availabilities: [], + end_of_node_synchronizations: [], + network_patches: ["ABC", "DEF"] + } + } + } = + %SummaryAggregate{ + p2p_availabilities: %{ + <<0>> => %{ + node_availabilities: [], + node_average_availabilities: [], + end_of_node_synchronizations: [], + network_patches: [["ABC", "DEF"], ["ABC", "DEF"]] + } + } + } + |> SummaryAggregate.aggregate() + end + + test "should aggregate multiple different network patches into a single one" do + P2P.add_and_connect_node(%Node{ + first_public_key: <<0::8, 0::8, 0::8, :crypto.strong_rand_bytes(31)::binary>>, + last_public_key: <<0::8, 0::8, 0::8, :crypto.strong_rand_bytes(31)::binary>>, + ip: {127, 0, 0, 1}, + port: 3000 + }) + + P2P.add_and_connect_node(%Node{ + first_public_key: <<0::8, 0::8, 0::8, :crypto.strong_rand_bytes(31)::binary>>, + last_public_key: <<0::8, 0::8, 0::8, :crypto.strong_rand_bytes(31)::binary>>, + ip: {127, 0, 0, 1}, + port: 3001 + }) + + assert %SummaryAggregate{ + p2p_availabilities: %{ + <<0>> => %{ + node_availabilities: <<>>, + node_average_availabilities: [], + end_of_node_synchronizations: [], + network_patches: ["BA6", "DEF"] + } + } + } = + %SummaryAggregate{ + p2p_availabilities: %{ + <<0>> => %{ + node_availabilities: [], + node_average_availabilities: [], + end_of_node_synchronizations: [], + network_patches: [ + ["ABC", "DEF"], + ["C90", "DEF"], + ["FFF", "DEF"], + ["000", "DEF"] + ] + } + } + } + |> SummaryAggregate.aggregate() + end + end end diff --git a/test/archethic/beacon_chain_test.exs b/test/archethic/beacon_chain_test.exs index ff04f3d8c..ae55b5afe 100644 --- a/test/archethic/beacon_chain_test.exs +++ b/test/archethic/beacon_chain_test.exs @@ -20,6 +20,7 @@ defmodule Archethic.BeaconChainTest do alias Archethic.P2P alias Archethic.P2P.Message.GetBeaconSummaries + alias Archethic.P2P.Message.GetTransactionSummary alias Archethic.P2P.Message.BeaconSummaryList alias Archethic.P2P.Message.GetCurrentSummaries @@ -92,11 +93,12 @@ defmodule Archethic.BeaconChainTest do transaction_attestations: [] } - assert :ok = BeaconChain.load_slot(slot) + assert :ok = BeaconChain.load_slot(slot, Crypto.first_node_public_key()) Process.sleep(500) - assert [%Slot{subset: <<0>>}] = SummaryCache.pop_slots(<<0>>) + assert [{%Slot{subset: <<0>>}, _}] = + SummaryCache.stream_current_slots(<<0>>) |> Enum.to_list() end end @@ -508,6 +510,171 @@ defmodule Archethic.BeaconChainTest do assert [0.925, 0.8, 0.925, 0.85] == node_average_availabilities end + + test "should find other beacon summaries and accumulate network patches", %{ + summary_time: summary_time, + nodes: [node1, node2, node3, node4] + } do + summary_v1 = %Summary{ + subset: "A", + summary_time: summary_time, + node_availabilities: <<1::1, 1::1>>, + network_patches: ["ABC", "DEF"] + } + + summary_v2 = %Summary{ + subset: "A", + summary_time: summary_time, + node_availabilities: <<1::1, 1::1>>, + network_patches: ["ABC", "DEF"] + } + + summary_v3 = %Summary{ + subset: "A", + summary_time: summary_time, + node_availabilities: <<1::1, 1::1>>, + network_patches: ["ABC", "DEF"] + } + + summary_v4 = %Summary{ + subset: "A", + summary_time: summary_time, + node_availabilities: <<1::1, 1::1>>, + network_patches: ["ABC", "DEF"] + } + + subset_address = Crypto.derive_beacon_chain_address("A", summary_time, true) + + MockClient + |> stub(:send_message, fn + ^node1, %GetBeaconSummaries{addresses: addresses}, _ -> + summaries = + if subset_address in addresses do + [summary_v1] + else + [] + end + + {:ok, %BeaconSummaryList{summaries: summaries}} + + ^node2, %GetBeaconSummaries{addresses: addresses}, _ -> + summaries = + if subset_address in addresses do + [summary_v2] + else + [] + end + + {:ok, %BeaconSummaryList{summaries: summaries}} + + ^node3, %GetBeaconSummaries{addresses: addresses}, _ -> + summaries = + if subset_address in addresses do + [summary_v3] + else + [] + end + + {:ok, %BeaconSummaryList{summaries: summaries}} + + ^node4, %GetBeaconSummaries{addresses: addresses}, _ -> + summaries = + if subset_address in addresses do + [summary_v4] + else + [] + end + + {:ok, %BeaconSummaryList{summaries: summaries}} + end) + + assert %SummaryAggregate{ + p2p_availabilities: %{ + "A" => %{ + network_patches: [ + ["ABC", "DEF"], + ["ABC", "DEF"], + ["ABC", "DEF"], + ["ABC", "DEF"] + ] + } + } + } = + BeaconChain.fetch_and_aggregate_summaries( + summary_time, + P2P.authorized_and_available_nodes() + ) + end + end + + describe "get_network_stats/1" do + test "should get the slot latencies aggregated by node" do + node1_slots = [ + %Slot{ + subset: <<0>>, + slot_time: DateTime.utc_now(), + p2p_view: %{ + availabilities: <<>>, + network_stats: [%{latency: 100}, %{latency: 200}, %{latency: 50}] + } + }, + %Slot{ + subset: <<0>>, + slot_time: DateTime.utc_now() |> DateTime.add(10), + p2p_view: %{ + availabilities: <<>>, + network_stats: [%{latency: 110}, %{latency: 150}, %{latency: 70}] + } + }, + %Slot{ + subset: <<0>>, + slot_time: DateTime.utc_now() |> DateTime.add(20), + p2p_view: %{ + availabilities: <<>>, + network_stats: [%{latency: 130}, %{latency: 110}, %{latency: 80}] + } + } + ] + + node2_slots = [ + %Slot{ + subset: <<0>>, + slot_time: DateTime.utc_now(), + p2p_view: %{ + availabilities: <<>>, + network_stats: [%{latency: 80}, %{latency: 110}, %{latency: 150}] + } + }, + %Slot{ + subset: <<0>>, + slot_time: DateTime.utc_now() |> DateTime.add(10), + p2p_view: %{ + availabilities: <<>>, + network_stats: [%{latency: 70}, %{latency: 140}, %{latency: 100}] + } + }, + %Slot{ + subset: <<0>>, + slot_time: DateTime.utc_now() |> DateTime.add(20), + p2p_view: %{ + availabilities: <<>>, + network_stats: [%{latency: 70}, %{latency: 100}, %{latency: 120}] + } + } + ] + + File.mkdir_p!(Utils.mut_dir()) + SummaryCache.start_link() + SummaryTimer.start_link(interval: "0 0 0 * *") + + Enum.map(node1_slots, &SummaryCache.add_slot(<<0>>, &1, "node1")) + Enum.map(node2_slots, &SummaryCache.add_slot(<<0>>, &1, "node2")) + + assert %{ + "node1" => [%{latency: 118}, %{latency: 138}, %{latency: 71}], + "node2" => [%{latency: 75}, %{latency: 118}, %{latency: 128}] + } = BeaconChain.get_network_stats(<<0>>) + end end describe "list_transactions_summaries_from_current_slot/0" do diff --git a/test/archethic/db/embedded_impl_test.exs b/test/archethic/db/embedded_impl_test.exs index 3ca1a2171..0a285d249 100644 --- a/test/archethic/db/embedded_impl_test.exs +++ b/test/archethic/db/embedded_impl_test.exs @@ -257,7 +257,8 @@ defmodule Archethic.DB.EmbeddedTest do <<0>> => %{ node_availabilities: <<1::1>>, node_average_availabilities: [1.0], - end_of_node_synchronizations: [] + end_of_node_synchronizations: [], + network_patches: ["AAA"] } }, availability_adding_time: 900 @@ -848,7 +849,7 @@ defmodule Archethic.DB.EmbeddedTest do describe "P2P summaries listing" do test "should register new P2P summary " do node_public_key = :crypto.strong_rand_bytes(32) - views = [{node_public_key, true, 0.8, DateTime.utc_now()}] + views = [{node_public_key, true, 0.8, DateTime.utc_now(), "AAA"}] EmbeddedImpl.register_p2p_summary(views) @@ -857,8 +858,8 @@ defmodule Archethic.DB.EmbeddedTest do node_public_key2 = :crypto.strong_rand_bytes(32) views = [ - {node_public_key, true, 0.8, DateTime.utc_now()}, - {node_public_key2, true, 0.5, DateTime.utc_now()} + {node_public_key, true, 0.8, DateTime.utc_now(), "AAA"}, + {node_public_key2, true, 0.5, DateTime.utc_now(), "AAA"} ] EmbeddedImpl.register_p2p_summary(views) diff --git a/test/archethic/p2p/message/get_network_stats_test.exs b/test/archethic/p2p/message/get_network_stats_test.exs new file mode 100644 index 000000000..117e80ea4 --- /dev/null +++ b/test/archethic/p2p/message/get_network_stats_test.exs @@ -0,0 +1,7 @@ +defmodule Archethic.P2P.Message.GetNetworkStatsTest do + @moduledoc false + use ExUnit.Case + + alias Archethic.P2P.Message.GetNetworkStats + doctest GetNetworkStats +end diff --git a/test/archethic/p2p/message/network_stats_test.exs b/test/archethic/p2p/message/network_stats_test.exs new file mode 100644 index 000000000..9cdb64a3a --- /dev/null +++ b/test/archethic/p2p/message/network_stats_test.exs @@ -0,0 +1,7 @@ +defmodule Archethic.P2P.Message.NetworkStatsTest do + @moduledoc false + use ExUnit.Case + + alias Archethic.P2P.Message.NetworkStats + doctest NetworkStats +end