From a4533778f878c2e329c9d06d628bf103c76e86f8 Mon Sep 17 00:00:00 2001 From: Samuel <42943690+samuel-uniris@users.noreply.github.com> Date: Mon, 27 Jun 2022 13:32:15 +0200 Subject: [PATCH] Improve self-repair summary downloads (#400) * Group summaries fetch by node * Stream beacon summaries --- lib/archethic/beacon_chain.ex | 114 +++- .../beacon_chain/summary_aggregate.ex | 157 ++++++ lib/archethic/self_repair/sync.ex | 172 +++++-- .../self_repair/sync/beacon_aggregate.ex | 66 --- .../sync/beacon_summary_handler.ex | 320 ------------ .../transaction_handler.ex | 2 +- lib/archethic_web/live/chains/beacon_live.ex | 32 +- test/archethic/beacon_chain_test.exs | 329 ++++++++++++ .../sync/beacon_summary_handler_test.exs | 486 ------------------ .../transaction_handler_test.exs | 4 +- test/archethic/self_repair/sync_test.exs | 137 ++++- 11 files changed, 863 insertions(+), 956 deletions(-) create mode 100644 lib/archethic/beacon_chain/summary_aggregate.ex delete mode 100644 lib/archethic/self_repair/sync/beacon_aggregate.ex delete mode 100644 lib/archethic/self_repair/sync/beacon_summary_handler.ex rename lib/archethic/self_repair/sync/{beacon_summary_handler => }/transaction_handler.ex (97%) delete mode 100644 test/archethic/self_repair/sync/beacon_summary_handler_test.exs rename test/archethic/self_repair/sync/{beacon_summary_handler => }/transaction_handler_test.exs (96%) diff --git a/lib/archethic/beacon_chain.ex b/lib/archethic/beacon_chain.ex index e9293edf6..3632ed02a 100644 --- a/lib/archethic/beacon_chain.ex +++ b/lib/archethic/beacon_chain.ex @@ -4,16 +4,16 @@ defmodule Archethic.BeaconChain do to retrieve the beacon storage nodes involved. """ - alias Archethic.BeaconChain.Slot - alias Archethic.BeaconChain.Slot.EndOfNodeSync - alias Archethic.BeaconChain.Slot.Validation, as: SlotValidation - - alias Archethic.BeaconChain.SlotTimer - alias Archethic.BeaconChain.Subset - alias Archethic.BeaconChain.Subset.P2PSampling - alias Archethic.BeaconChain.Subset.SummaryCache - alias Archethic.BeaconChain.Summary - alias Archethic.BeaconChain.SummaryTimer + alias __MODULE__.Slot + alias __MODULE__.Slot.EndOfNodeSync + alias __MODULE__.Slot.Validation, as: SlotValidation + alias __MODULE__.SlotTimer + alias __MODULE__.Subset + alias __MODULE__.Subset.P2PSampling + alias __MODULE__.Subset.SummaryCache + alias __MODULE__.Summary + alias __MODULE__.SummaryAggregate + alias __MODULE__.SummaryTimer alias Archethic.Crypto @@ -21,6 +21,8 @@ defmodule Archethic.BeaconChain do alias Archethic.P2P alias Archethic.P2P.Node + alias Archethic.P2P.Message.GetBeaconSummaries + alias Archethic.P2P.Message.BeaconSummaryList alias Archethic.P2P.Message.RegisterBeaconUpdates alias Archethic.TaskSupervisor @@ -259,4 +261,96 @@ defmodule Archethic.BeaconChain do }) end) end + + @doc """ + Request from the beacon chains all the summaries for the given dates and aggregate them + + ``` + [0, 1, ...] Subsets + / | \ + / | \ + [ ] [ ] [ ] Node election for each dates to sync + |\ /|\ /| + | \/ | \/ | + | /\ | /\ | + [ ] [ ] [ ] Partition by node + | | | + [ ] [ ] [ ] Aggregate addresses + | | | + [ ] [ ] [ ] Fetch summaries + |\ /|\ /| + | \/ | \/ | + | /\ | /\ | + [D1] [D2] [D3] Partition by date + | | | + [ ] [ ] [ ] Aggregate and consolidate summaries + \ | / + \ | / + \ | / + \ | / + [ ] + ``` + """ + @spec fetch_summary_aggregates(list(DateTime.t()) | Enumerable.t()) :: + list(SummaryAggregate.t()) + def fetch_summary_aggregates(dates) do + authorized_nodes = P2P.authorized_and_available_nodes() + + list_subsets() + |> Flow.from_enumerable() + |> Flow.flat_map(fn subset -> + # Foreach subset and date we compute concurrently the node election + dates + |> Stream.map(&get_summary_address_by_node(&1, subset, authorized_nodes)) + |> Enum.flat_map(& &1) + end) + # We partition by node + |> Flow.partition(key: {:elem, 0}) + |> Flow.reduce(fn -> %{} end, fn {node, summary_address}, acc -> + # We aggregate the addresses for a given node + Map.update(acc, node, [summary_address], &[summary_address | &1]) + end) + |> Flow.flat_map(fn {node, addresses} -> + # For this node we fetch the summaries + fetch_summaries(node, addresses) + end) + # We repartition by summary time to aggregate summaries for a date + |> Flow.partition(key: {:key, :summary_time}) + |> Flow.reduce( + fn -> %SummaryAggregate{} end, + &SummaryAggregate.add_summary(&2, &1) + ) + |> Flow.emit(:state) + |> Stream.reject(&SummaryAggregate.empty?/1) + |> Stream.map(&SummaryAggregate.aggregate/1) + |> Enum.sort_by(& &1.summary_time) + end + + defp get_summary_address_by_node(date, subset, authorized_nodes) do + filter_nodes = + Enum.filter(authorized_nodes, &(DateTime.compare(&1.authorization_date, date) == :lt)) + + summary_address = Crypto.derive_beacon_chain_address(subset, date, true) + + subset + |> Election.beacon_storage_nodes(date, filter_nodes) + |> Enum.map(fn node -> + {node, summary_address} + end) + end + + defp fetch_summaries(node, addresses) do + addresses + |> Stream.chunk_every(10) + |> Stream.flat_map(fn addresses -> + case P2P.send_message(node, %GetBeaconSummaries{addresses: addresses}) do + {:ok, %BeaconSummaryList{summaries: summaries}} -> + summaries + + _ -> + [] + end + end) + |> Enum.to_list() + end end diff --git a/lib/archethic/beacon_chain/summary_aggregate.ex b/lib/archethic/beacon_chain/summary_aggregate.ex new file mode 100644 index 000000000..af2a1dbb4 --- /dev/null +++ b/lib/archethic/beacon_chain/summary_aggregate.ex @@ -0,0 +1,157 @@ +defmodule Archethic.BeaconChain.SummaryAggregate do + @moduledoc """ + Represents an aggregate of multiple beacon summary from multiple subsets for a given date + + This will help the self-sepair to maintain an aggregated and ordered view of items to synchronize and to resolve + """ + + defstruct [:summary_time, transaction_summaries: [], p2p_availabilities: %{}] + + alias Archethic.Crypto + + alias Archethic.BeaconChain.ReplicationAttestation + alias Archethic.BeaconChain.Summary, as: BeaconSummary + + alias Archethic.TransactionChain.TransactionSummary + + alias Archethic.Utils + + @type t :: %__MODULE__{ + summary_time: DateTime.t(), + transaction_summaries: list(TransactionSummary.t()), + p2p_availabilities: %{ + (subset :: binary()) => %{ + node_availabilities: bitstring(), + node_average_availabilities: list(float()), + end_of_node_synchronizations: list(Crypto.key()) + } + } + } + + @doc """ + Aggregate a new BeaconChain's summary + """ + @spec add_summary(t(), BeaconSummary.t()) :: t() + def add_summary( + agg = %__MODULE__{}, + %BeaconSummary{ + subset: subset, + summary_time: summary_time, + transaction_attestations: transaction_attestations, + node_availabilities: node_availabilities, + node_average_availabilities: node_average_availabilities, + end_of_node_synchronizations: end_of_node_synchronizations + } + ) do + valid_attestations? = + Enum.all?(transaction_attestations, fn attestation -> + ReplicationAttestation.validate(attestation) == :ok + end) + + if valid_attestations? do + agg + |> Map.update!( + :transaction_summaries, + fn prev -> + transaction_attestations + |> Enum.map(& &1.transaction_summary) + |> Enum.concat(prev) + end + ) + |> update_in( + [ + Access.key(:p2p_availabilities, %{}), + Access.key(subset, %{ + node_availabilities: [], + node_average_availabilities: [], + end_of_node_synchronizations: [] + }) + ], + fn prev -> + prev + |> Map.update!( + :node_availabilities, + &Enum.concat(&1, [Utils.bitstring_to_integer_list(node_availabilities)]) + ) + |> Map.update!( + :node_average_availabilities, + &Enum.concat(&1, [node_average_availabilities]) + ) + |> Map.update!( + :end_of_node_synchronizations, + &Enum.concat(&1, end_of_node_synchronizations) + ) + end + ) + |> Map.update(:summary_time, summary_time, fn + nil -> summary_time + prev -> prev + end) + else + agg + end + end + + @doc """ + Aggregate summaries batch + """ + @spec aggregate(t()) :: t() + def aggregate(agg = %__MODULE__{}) do + agg + |> Map.update!(:transaction_summaries, fn transactions -> + transactions + |> Enum.uniq_by(& &1.address) + |> Enum.sort_by(& &1.timestamp, {:asc, DateTime}) + end) + |> Map.update!(:p2p_availabilities, fn availabilities_by_subject -> + availabilities_by_subject + |> Enum.map(fn {subset, data} -> + {subset, + data + |> Map.update!(:node_availabilities, &aggregate_node_availabilities/1) + |> Map.update!(:node_average_availabilities, &aggregate_node_average_availabilities/1)} + end) + |> Enum.into(%{}) + end) + end + + defp aggregate_node_availabilities(node_availabilities) do + node_availabilities + |> Enum.zip() + |> Enum.map(&Tuple.to_list/1) + |> Enum.map(fn availabilities -> + # Get the mode of the availabilities + frequencies = Enum.frequencies(availabilities) + online_frequencies = Map.get(frequencies, 1, 0) + offline_frequencies = Map.get(frequencies, 0, 0) + + if online_frequencies >= offline_frequencies do + 1 + else + 0 + end + end) + |> List.flatten() + |> Enum.map(&<<&1::1>>) + |> :erlang.list_to_bitstring() + end + + defp aggregate_node_average_availabilities(avg_availabilities) do + avg_availabilities + |> Enum.zip() + |> Enum.map(&Tuple.to_list/1) + |> Enum.map(fn avg_availabilities -> + Float.round(Enum.sum(avg_availabilities) / length(avg_availabilities), 3) + end) + end + + @doc """ + Determine when the aggregate is empty + """ + @spec empty?(t()) :: boolean() + def empty?(%__MODULE__{transaction_summaries: [], p2p_availabilities: p2p_availabilities}) + when map_size(p2p_availabilities) == 0, + do: true + + def empty?(%__MODULE__{}), do: false +end diff --git a/lib/archethic/self_repair/sync.ex b/lib/archethic/self_repair/sync.ex index c1e1a1d35..d5d7e291d 100644 --- a/lib/archethic/self_repair/sync.ex +++ b/lib/archethic/self_repair/sync.ex @@ -2,17 +2,22 @@ defmodule Archethic.SelfRepair.Sync do @moduledoc false alias Archethic.BeaconChain - alias Archethic.BeaconChain.Summary, as: BeaconSummary + alias Archethic.BeaconChain.Subset.P2PSampling + alias Archethic.BeaconChain.SummaryAggregate alias Archethic.DB - alias Archethic.Election + alias Archethic.PubSub alias Archethic.P2P alias Archethic.P2P.Node - alias __MODULE__.BeaconSummaryHandler - alias __MODULE__.BeaconSummaryAggregate + alias __MODULE__.TransactionHandler + + alias Archethic.TaskSupervisor + alias Archethic.TransactionChain + + alias Archethic.Utils require Logger @@ -99,57 +104,146 @@ defmodule Archethic.SelfRepair.Sync do start = System.monotonic_time() - authorized_nodes = P2P.authorized_and_available_nodes() - last_sync_date |> BeaconChain.next_summary_dates() - |> Flow.from_enumerable() - |> Flow.flat_map(&subsets_by_times/1) - |> Flow.partition(key: {:elem, 0}, window: flow_window()) - |> Flow.reduce(fn -> [] end, fn {time, subset}, acc -> - summary = get_beacon_summary(time, subset, authorized_nodes) + |> BeaconChain.fetch_summary_aggregates() + |> Enum.each(&process_summary_aggregate(&1, patch)) + + :telemetry.execute([:archethic, :self_repair], %{duration: System.monotonic_time() - start}) + end + + @doc """ + Process beacon summary to synchronize the transactions involving. - if BeaconSummary.empty?(summary) do + Each transactions from the beacon summary will be analyzed to determine + if the node is a storage node for this transaction. If so, it will download the + transaction from the closest storage nodes and replicate it locally. + + The P2P view will also be updated if some node information are inside the beacon chain to determine + the readiness or the availability of a node. + + Also, the number of transaction received during the beacon summary interval will be stored. + """ + @spec process_summary_aggregate(SummaryAggregate.t(), binary()) :: :ok + def process_summary_aggregate( + %SummaryAggregate{ + summary_time: summary_time, + transaction_summaries: transaction_summaries, + p2p_availabilities: p2p_availabilities + }, + node_patch + ) do + transaction_summaries + |> Enum.reject(&TransactionChain.transaction_exists?(&1.address)) + |> Enum.filter(&TransactionHandler.download_transaction?/1) + |> synchronize_transactions(node_patch) + + p2p_availabilities + |> Enum.reduce(%{}, fn {subset, + %{ + node_availabilities: node_availabilities, + node_average_availabilities: node_average_availabilities, + end_of_node_synchronizations: end_of_node_synchronizations + }}, + acc -> + sync_node(end_of_node_synchronizations) + + reduce_p2p_availabilities( + subset, + summary_time, + node_availabilities, + node_average_availabilities, acc - else - [summary | acc] - end + ) end) - |> Flow.on_trigger(fn acc, _, {:fixed, time, :done} -> - agg = %BeaconSummaryAggregate{ - summary_time: DateTime.from_unix!(time, :millisecond) - } + |> Enum.each(&update_availabilities/1) - {[Enum.reduce(acc, agg, &BeaconSummaryAggregate.add_summary(&2, &1))], acc} - end) - |> Stream.reject(&BeaconSummaryAggregate.empty?/1) - |> Enum.sort_by(& &1.summary_time) - |> Enum.each(&BeaconSummaryHandler.process_summary_aggregate(&1, patch)) + update_statistics(summary_time, length(transaction_summaries)) + end - :telemetry.execute([:archethic, :self_repair], %{duration: System.monotonic_time() - start}) + defp synchronize_transactions([], _node_patch), do: :ok + + defp synchronize_transactions(transaction_summaries, node_patch) do + Logger.info("Need to synchronize #{Enum.count(transaction_summaries)} transactions") + Logger.debug("Transaction to sync #{inspect(transaction_summaries)}") + + Task.Supervisor.async_stream_nolink( + TaskSupervisor, + transaction_summaries, + &TransactionHandler.download_transaction(&1, node_patch), + on_timeout: :kill_task + ) + |> Stream.filter(&match?({:ok, _}, &1)) + |> Stream.each(fn {:ok, tx} -> + TransactionHandler.process_transaction(tx) + end) + |> Stream.run() end - defp subsets_by_times(time) do - subsets = BeaconChain.list_subsets() - Enum.map(subsets, fn subset -> {DateTime.truncate(time, :second), subset} end) + defp sync_node(end_of_node_synchronizations) do + end_of_node_synchronizations + |> Enum.each(fn public_key -> P2P.set_node_globally_synced(public_key) end) end - defp flow_window do - Flow.Window.fixed(summary_interval(:second), :second, fn {date, _} -> - DateTime.to_unix(date, :millisecond) + defp reduce_p2p_availabilities( + subset, + time, + node_availabilities, + node_average_availabilities, + acc + ) do + node_list = Enum.filter(P2P.list_nodes(), &(DateTime.diff(&1.enrollment_date, time) <= 0)) + + subset_node_list = P2PSampling.list_nodes_to_sample(subset, node_list) + + node_availabilities + |> Utils.bitstring_to_integer_list() + |> Enum.with_index() + |> Enum.reduce(acc, fn {available_bit, index}, acc -> + node = Enum.at(subset_node_list, index) + avg_availability = Enum.at(node_average_availabilities, index) + + if available_bit == 1 and Node.synced?(node) do + Map.put(acc, node, %{available?: true, average_availability: avg_availability}) + else + Map.put(acc, node, %{available?: false, average_availability: avg_availability}) + end end) end - defp summary_interval(unit) do - next_summary = BeaconChain.next_summary_date(DateTime.utc_now()) - next_summary2 = BeaconChain.next_summary_date(next_summary) - DateTime.diff(next_summary2, next_summary, unit) + defp update_availabilities( + {%Node{first_public_key: node_key}, + %{available?: available?, average_availability: avg_availability}} + ) do + DB.register_p2p_summary(node_key, DateTime.utc_now(), available?, avg_availability) + + if available? do + P2P.set_node_globally_available(node_key) + else + P2P.set_node_globally_unavailable(node_key) + P2P.set_node_globally_unsynced(node_key) + end + + P2P.set_node_average_availability(node_key, avg_availability) end - defp get_beacon_summary(time, subset, node_list) do - filter_nodes = Enum.filter(node_list, &(DateTime.compare(&1.authorization_date, time) == :lt)) + defp update_statistics(_date, 0), do: :ok + + defp update_statistics(date, nb_transactions) do + previous_summary_time = + date + |> Utils.truncate_datetime() + |> BeaconChain.previous_summary_time() + + nb_seconds = abs(DateTime.diff(previous_summary_time, date)) + tps = nb_transactions / nb_seconds + + DB.register_tps(date, tps, nb_transactions) + + Logger.info( + "TPS #{tps} on #{Utils.time_to_string(date)} with #{nb_transactions} transactions" + ) - nodes = Election.beacon_storage_nodes(subset, time, filter_nodes) - BeaconSummaryHandler.get_full_beacon_summary(time, subset, nodes) + PubSub.notify_new_tps(tps, nb_transactions) end end diff --git a/lib/archethic/self_repair/sync/beacon_aggregate.ex b/lib/archethic/self_repair/sync/beacon_aggregate.ex deleted file mode 100644 index 769a1b4c2..000000000 --- a/lib/archethic/self_repair/sync/beacon_aggregate.ex +++ /dev/null @@ -1,66 +0,0 @@ -defmodule Archethic.SelfRepair.Sync.BeaconSummaryAggregate do - @moduledoc """ - Represents an aggregate of multiple beacon summary from multiple subsets for a given date - - This will help the self-sepair to maintain an aggregated and ordered view of items to synchronize and to resolve - """ - - defstruct [:summary_time, transaction_summaries: [], p2p_availabilities: %{}] - - alias Archethic.BeaconChain.Summary, as: BeaconSummary - alias Archethic.TransactionChain.TransactionSummary - alias Archethic.Crypto - - @type t :: %__MODULE__{ - summary_time: DateTime.t(), - transaction_summaries: list(TransactionSummary.t()), - p2p_availabilities: %{ - (subset :: binary()) => %{ - node_availabilities: bitstring(), - node_average_availabilities: list(float()), - end_of_node_synchronizations: list(Crypto.key()) - } - } - } - - @doc """ - Aggregate a new BeaconChain's summary - """ - @spec add_summary(t(), BeaconSummary.t()) :: t() - def add_summary( - agg = %__MODULE__{}, - %BeaconSummary{ - subset: subset, - transaction_attestations: attestations, - node_availabilities: node_availabilities, - node_average_availabilities: node_average_availabilities, - end_of_node_synchronizations: end_of_node_synchronizations - } - ) do - transaction_summaries = - attestations - |> Enum.map(& &1.transaction_summary) - |> Enum.concat(agg.transaction_summaries) - |> Enum.uniq_by(& &1.address) - |> Enum.sort_by(& &1.timestamp, {:asc, DateTime}) - - p2p_availabilities = - Map.put(agg.p2p_availabilities, subset, %{ - node_availabilities: node_availabilities, - node_average_availabilities: node_average_availabilities, - end_of_node_synchronizations: end_of_node_synchronizations - }) - - %{agg | transaction_summaries: transaction_summaries, p2p_availabilities: p2p_availabilities} - end - - @doc """ - Determine when the aggregate is empty - """ - @spec empty?(t()) :: boolean() - def empty?(%__MODULE__{transaction_summaries: [], p2p_availabilities: p2p_availabilities}) - when map_size(p2p_availabilities) == 0, - do: true - - def empty?(%__MODULE__{}), do: false -end diff --git a/lib/archethic/self_repair/sync/beacon_summary_handler.ex b/lib/archethic/self_repair/sync/beacon_summary_handler.ex deleted file mode 100644 index 4fc32bc03..000000000 --- a/lib/archethic/self_repair/sync/beacon_summary_handler.ex +++ /dev/null @@ -1,320 +0,0 @@ -defmodule Archethic.SelfRepair.Sync.BeaconSummaryHandler do - @moduledoc false - - alias Archethic.BeaconChain - alias Archethic.BeaconChain.ReplicationAttestation - alias Archethic.BeaconChain.Subset.P2PSampling - alias Archethic.BeaconChain.Summary, as: BeaconSummary - - alias Archethic.Crypto - - alias Archethic.DB - - alias Archethic.P2P - alias Archethic.P2P.Message.GetBeaconSummary - alias Archethic.P2P.Message.NotFound - alias Archethic.P2P.Node - - alias Archethic.PubSub - - alias Archethic.SelfRepair.Sync.BeaconSummaryAggregate - alias __MODULE__.TransactionHandler - - alias Archethic.TaskSupervisor - - alias Archethic.TransactionChain - - alias Archethic.Utils - - require Logger - - @doc """ - """ - @spec get_full_beacon_summary(DateTime.t(), binary(), list(Node.t())) :: BeaconSummary.t() - def get_full_beacon_summary(summary_time, subset, nodes) do - summary_address = Crypto.derive_beacon_chain_address(subset, summary_time, true) - - conflict_resolver = fn results -> - acc = %{ - transaction_attestations: [], - node_availabilities: [], - node_average_availabilities: [], - end_of_node_synchronizations: [] - } - - results - |> Enum.filter(&match?(%BeaconSummary{}, &1)) - |> Enum.reduce(acc, &do_reduce_beacon_summaries/2) - |> aggregate_summary(summary_time, subset) - end - - case P2P.quorum_read( - nodes, - %GetBeaconSummary{address: summary_address}, - conflict_resolver, - length(nodes) - ) do - {:ok, summary = %BeaconSummary{}} -> - summary - - _ -> - %BeaconSummary{summary_time: summary_time, subset: subset} - end - end - - defp do_reduce_beacon_summaries( - %BeaconSummary{ - transaction_attestations: transaction_attestations, - node_availabilities: node_availabilities, - node_average_availabilities: node_average_availabilities, - end_of_node_synchronizations: end_of_node_synchronizations - }, - acc - ) do - valid_attestations? = - Enum.all?(transaction_attestations, fn attestation -> - ReplicationAttestation.validate(attestation) == :ok - end) - - if valid_attestations? do - acc - |> Map.update!( - :transaction_attestations, - &Enum.concat(transaction_attestations, &1) - ) - |> Map.update!( - :node_availabilities, - &Enum.concat(&1, [Utils.bitstring_to_integer_list(node_availabilities)]) - ) - |> Map.update!( - :node_average_availabilities, - &Enum.concat(&1, [node_average_availabilities]) - ) - |> Map.update!( - :end_of_node_synchronizations, - &Enum.concat(&1, end_of_node_synchronizations) - ) - else - acc - end - end - - defp aggregate_summary( - %{ - transaction_attestations: transaction_attestations, - node_availabilities: node_availabilities, - node_average_availabilities: node_average_availabilities, - end_of_node_synchronizations: end_of_node_synchronizations - }, - summary_time, - subset - ) do - %BeaconSummary{ - subset: subset, - summary_time: summary_time, - transaction_attestations: - Enum.uniq_by(List.flatten(transaction_attestations), & &1.transaction_summary.address), - node_availabilities: aggregate_node_availabilities(node_availabilities), - node_average_availabilities: - aggregate_node_average_availabilities(node_average_availabilities), - end_of_node_synchronizations: end_of_node_synchronizations - } - end - - defp aggregate_node_availabilities(node_availabilities) do - node_availabilities - |> Enum.zip() - |> Enum.map(&Tuple.to_list/1) - |> Enum.map(fn availabilities -> - # Get the mode of the availabilities - frequencies = Enum.frequencies(availabilities) - online_frequencies = Map.get(frequencies, 1, 0) - offline_frequencies = Map.get(frequencies, 0, 0) - - if online_frequencies >= offline_frequencies do - 1 - else - 0 - end - end) - |> List.flatten() - |> Enum.map(&<<&1::1>>) - |> :erlang.list_to_bitstring() - end - - defp aggregate_node_average_availabilities(avg_availabilities) do - avg_availabilities - |> Enum.zip() - |> Enum.map(&Tuple.to_list/1) - |> Enum.map(fn avg_availabilities -> - Enum.sum(avg_availabilities) / length(avg_availabilities) - end) - end - - @doc """ - Download the beacon summary from the closest nodes given - """ - @spec download_summary(Crypto.versioned_hash(), list(Node.t()), binary()) :: - {:ok, BeaconSummary.t() | NotFound.t()} | {:error, any()} - def download_summary(_beacon_address, [], _), do: {:ok, %NotFound{}} - - def download_summary(beacon_address, nodes, patch) do - conflict_resolver = fn results -> - acc = %{ - transaction_attestations: [], - node_availabilities: [], - node_average_availabilities: [] - } - - %{transaction_attestations: transaction_attestations} = - results - |> Enum.filter(&match?(%BeaconSummary{}, &1)) - |> Enum.reduce(acc, &do_reduce_beacon_summaries/2) - - %BeaconSummary{transaction_attestations: transaction_attestations} - end - - nodes - |> P2P.nearest_nodes(patch) - |> P2P.quorum_read( - %GetBeaconSummary{address: beacon_address}, - conflict_resolver, - length(nodes) - ) - end - - @doc """ - Process beacon summary to synchronize the transactions involving. - - Each transactions from the beacon summary will be analyzed to determine - if the node is a storage node for this transaction. If so, it will download the - transaction from the closest storage nodes and replicate it locally. - - The P2P view will also be updated if some node information are inside the beacon chain to determine - the readiness or the availability of a node. - - Also, the number of transaction received during the beacon summary interval will be stored. - """ - @spec process_summary_aggregate(BeaconSummaryAggregate.t(), binary()) :: :ok - def process_summary_aggregate( - %BeaconSummaryAggregate{ - summary_time: summary_time, - transaction_summaries: transaction_summaries, - p2p_availabilities: p2p_availabilities - }, - node_patch - ) do - transaction_summaries - |> Enum.reject(&TransactionChain.transaction_exists?(&1.address)) - |> Enum.filter(&TransactionHandler.download_transaction?/1) - |> synchronize_transactions(node_patch) - - p2p_availabilities - |> Enum.reduce(%{}, fn {subset, - %{ - node_availabilities: node_availabilities, - node_average_availabilities: node_average_availabilities, - end_of_node_synchronizations: end_of_node_synchronizations - }}, - acc -> - sync_node(end_of_node_synchronizations) - - reduce_p2p_availabilities( - subset, - summary_time, - node_availabilities, - node_average_availabilities, - acc - ) - end) - |> Enum.each(&update_availabilities/1) - - update_statistics(summary_time, length(transaction_summaries)) - end - - defp synchronize_transactions([], _node_patch), do: :ok - - defp synchronize_transactions(transaction_summaries, node_patch) do - Logger.info("Need to synchronize #{Enum.count(transaction_summaries)} transactions") - Logger.debug("Transaction to sync #{inspect(transaction_summaries)}") - - Task.Supervisor.async_stream_nolink( - TaskSupervisor, - transaction_summaries, - &TransactionHandler.download_transaction(&1, node_patch), - on_timeout: :kill_task - ) - |> Stream.filter(&match?({:ok, _}, &1)) - |> Stream.each(fn {:ok, tx} -> - TransactionHandler.process_transaction(tx) - end) - |> Stream.run() - end - - defp sync_node(end_of_node_synchronizations) do - end_of_node_synchronizations - |> Enum.each(fn public_key -> P2P.set_node_globally_synced(public_key) end) - end - - defp reduce_p2p_availabilities( - subset, - time, - node_availabilities, - node_average_availabilities, - acc - ) do - node_list = Enum.filter(P2P.list_nodes(), &(DateTime.diff(&1.enrollment_date, time) <= 0)) - - subset_node_list = P2PSampling.list_nodes_to_sample(subset, node_list) - - node_availabilities - |> Utils.bitstring_to_integer_list() - |> Enum.with_index() - |> Enum.reduce(acc, fn {available_bit, index}, acc -> - node = Enum.at(subset_node_list, index) - avg_availability = Enum.at(node_average_availabilities, index) - - if available_bit == 1 and Node.synced?(node) do - Map.put(acc, node, %{available?: true, average_availability: avg_availability}) - else - Map.put(acc, node, %{available?: false, average_availability: avg_availability}) - end - end) - end - - defp update_availabilities( - {%Node{first_public_key: node_key}, - %{available?: available?, average_availability: avg_availability}} - ) do - DB.register_p2p_summary(node_key, DateTime.utc_now(), available?, avg_availability) - - if available? do - P2P.set_node_globally_available(node_key) - else - P2P.set_node_globally_unavailable(node_key) - P2P.set_node_globally_unsynced(node_key) - end - - P2P.set_node_average_availability(node_key, avg_availability) - end - - defp update_statistics(_date, 0), do: :ok - - defp update_statistics(date, nb_transactions) do - previous_summary_time = - date - |> Utils.truncate_datetime() - |> BeaconChain.previous_summary_time() - - nb_seconds = abs(DateTime.diff(previous_summary_time, date)) - tps = nb_transactions / nb_seconds - - DB.register_tps(date, tps, nb_transactions) - - Logger.info( - "TPS #{tps} on #{Utils.time_to_string(date)} with #{nb_transactions} transactions" - ) - - PubSub.notify_new_tps(tps, nb_transactions) - end -end diff --git a/lib/archethic/self_repair/sync/beacon_summary_handler/transaction_handler.ex b/lib/archethic/self_repair/sync/transaction_handler.ex similarity index 97% rename from lib/archethic/self_repair/sync/beacon_summary_handler/transaction_handler.ex rename to lib/archethic/self_repair/sync/transaction_handler.ex index a4e6296a8..e896a875f 100644 --- a/lib/archethic/self_repair/sync/beacon_summary_handler/transaction_handler.ex +++ b/lib/archethic/self_repair/sync/transaction_handler.ex @@ -1,4 +1,4 @@ -defmodule Archethic.SelfRepair.Sync.BeaconSummaryHandler.TransactionHandler do +defmodule Archethic.SelfRepair.Sync.TransactionHandler do @moduledoc false alias Archethic.Crypto diff --git a/lib/archethic_web/live/chains/beacon_live.ex b/lib/archethic_web/live/chains/beacon_live.ex index 0d8782329..6b838ae1e 100644 --- a/lib/archethic_web/live/chains/beacon_live.ex +++ b/lib/archethic_web/live/chains/beacon_live.ex @@ -5,7 +5,6 @@ defmodule ArchethicWeb.BeaconChainLive do alias Archethic.BeaconChain alias Archethic.BeaconChain.ReplicationAttestation alias Archethic.BeaconChain.Slot - alias Archethic.BeaconChain.Summary, as: BeaconSummary alias Archethic.BeaconChain.SummaryTimer alias Archethic.Crypto @@ -17,8 +16,6 @@ defmodule ArchethicWeb.BeaconChainLive do alias Archethic.PubSub - alias Archethic.SelfRepair.Sync.BeaconSummaryHandler - alias Archethic.TransactionChain alias Archethic.TransactionChain.Transaction alias Archethic.TransactionChain.TransactionData @@ -228,32 +225,9 @@ defmodule ArchethicWeb.BeaconChainLive do end defp list_transactions_from_summary(date = %DateTime{}) do - %Node{network_patch: patch} = P2P.get_node_info() - - node_list = P2P.authorized_and_available_nodes() - - BeaconChain.list_subsets() - |> Flow.from_enumerable() - |> Flow.map(fn subset -> - nodes = Election.beacon_storage_nodes(subset, date, node_list) - {subset, nodes} - end) - |> Flow.partition(key: {:elem, 0}) - |> Flow.reduce(fn -> [] end, fn {subset, nodes}, acc -> - address = Crypto.derive_beacon_chain_address(subset, date, true) - - case BeaconSummaryHandler.download_summary(address, nodes, patch) do - {:ok, %BeaconSummary{transaction_attestations: attestations}} -> - tx_summaries = Enum.map(attestations, & &1.transaction_summary) - tx_summaries ++ acc - - _ -> - acc - end - end) - |> Enum.to_list() - |> List.flatten() - |> Enum.uniq_by(& &1.address) + [date] + |> BeaconChain.fetch_summary_aggregates() + |> Enum.flat_map(& &1.transaction_summaries) |> Enum.sort_by(& &1.timestamp, {:desc, DateTime}) end diff --git a/test/archethic/beacon_chain_test.exs b/test/archethic/beacon_chain_test.exs index 0e8c4f98d..806d0e518 100644 --- a/test/archethic/beacon_chain_test.exs +++ b/test/archethic/beacon_chain_test.exs @@ -2,22 +2,30 @@ defmodule Archethic.BeaconChainTest do use ArchethicCase alias Archethic.BeaconChain + alias Archethic.BeaconChain.ReplicationAttestation alias Archethic.BeaconChain.Slot alias Archethic.BeaconChain.Slot.EndOfNodeSync alias Archethic.BeaconChain.SlotTimer alias Archethic.BeaconChain.Subset alias Archethic.BeaconChain.Subset.SummaryCache alias Archethic.BeaconChain.SubsetRegistry + alias Archethic.BeaconChain.Summary + alias Archethic.BeaconChain.SummaryAggregate alias Archethic.BeaconChain.SummaryTimer alias Archethic.Crypto + alias Archethic.Election + alias Archethic.P2P + alias Archethic.P2P.Message.GetBeaconSummaries + alias Archethic.P2P.Message.BeaconSummaryList alias Archethic.P2P.Node alias Archethic.TransactionChain.Transaction alias Archethic.TransactionChain.Transaction.ValidationStamp alias Archethic.TransactionChain.TransactionData + alias Archethic.TransactionChain.TransactionSummary alias Archethic.Utils @@ -105,4 +113,325 @@ defmodule Archethic.BeaconChainTest do assert [%Slot{subset: <<0>>}] = SummaryCache.pop_slots(<<0>>) end end + + describe "fetch_summary_aggregates/1" do + setup do + summary_time = ~U[2021-01-22 16:12:58Z] + + node_keypair1 = Crypto.derive_keypair("node_seed", 1) + node_keypair2 = Crypto.derive_keypair("node_seed", 2) + node_keypair3 = Crypto.derive_keypair("node_seed", 3) + node_keypair4 = Crypto.derive_keypair("node_seed", 4) + + node1 = %Node{ + ip: {127, 0, 0, 1}, + port: 3000, + first_public_key: elem(node_keypair1, 0), + last_public_key: elem(node_keypair1, 0), + network_patch: "AAA", + geo_patch: "AAA", + available?: true, + authorization_date: summary_time |> DateTime.add(-10), + authorized?: true, + reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> + } + + node2 = %Node{ + ip: {127, 0, 0, 1}, + port: 3000, + first_public_key: elem(node_keypair2, 0), + last_public_key: elem(node_keypair2, 0), + network_patch: "AAA", + geo_patch: "AAA", + available?: true, + authorization_date: summary_time |> DateTime.add(-10), + authorized?: true, + reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> + } + + node3 = %Node{ + ip: {127, 0, 0, 1}, + port: 3000, + first_public_key: elem(node_keypair3, 0), + last_public_key: elem(node_keypair3, 0), + network_patch: "AAA", + geo_patch: "AAA", + available?: true, + authorization_date: summary_time |> DateTime.add(-10), + authorized?: true, + reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> + } + + node4 = %Node{ + ip: {127, 0, 0, 1}, + port: 3000, + first_public_key: elem(node_keypair4, 0), + last_public_key: elem(node_keypair4, 0), + network_patch: "AAA", + geo_patch: "AAA", + available?: true, + authorization_date: summary_time |> DateTime.add(-10), + authorized?: true, + reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> + } + + P2P.add_and_connect_node(node1) + P2P.add_and_connect_node(node2) + P2P.add_and_connect_node(node3) + P2P.add_and_connect_node(node4) + + P2P.add_and_connect_node(%Node{ + first_public_key: Crypto.last_node_public_key(), + network_patch: "AAA", + available?: false + }) + + {:ok, %{summary_time: summary_time, nodes: [node1, node2, node3, node4]}} + end + + test "should download the beacon summary", %{ + summary_time: summary_time, + nodes: [node1, node2, node3, node4] + } do + addr1 = <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> + + tx_summary = %TransactionSummary{ + address: addr1, + timestamp: DateTime.utc_now(), + type: :transfer, + fee: 100_000_000 + } + + storage_nodes = + Election.chain_storage_nodes_with_type(addr1, :transfer, P2P.available_nodes()) + + beacon_summary = %Summary{ + subset: "A", + summary_time: summary_time, + transaction_attestations: [ + %ReplicationAttestation{ + transaction_summary: tx_summary, + confirmations: + [node1, node2, node3, node4] + |> Enum.with_index(1) + |> Enum.map(fn {node, index} -> + node_index = Enum.find_index(storage_nodes, &(&1 == node)) + {_, pv} = Crypto.derive_keypair("node_seed", index) + {node_index, Crypto.sign(TransactionSummary.serialize(tx_summary), pv)} + end) + } + ] + } + + MockClient + |> stub(:send_message, fn + _, %GetBeaconSummaries{}, _ -> + {:ok, %BeaconSummaryList{summaries: [beacon_summary]}} + end) + + [%SummaryAggregate{transaction_summaries: transaction_summaries}] = + BeaconChain.fetch_summary_aggregates([summary_time]) + + assert [addr1] == Enum.map(transaction_summaries, & &1.address) + end + + test "should find other beacon summaries and aggregate missing summaries", %{ + summary_time: summary_time, + nodes: [node1, node2, node3, node4] + } do + addr1 = <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> + addr2 = <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> + + storage_nodes = + Election.chain_storage_nodes_with_type(addr1, :transfer, P2P.available_nodes()) + + tx_summary = %TransactionSummary{ + address: addr1, + timestamp: DateTime.utc_now(), + type: :transfer, + fee: 100_000_000 + } + + summary_v1 = %Summary{ + subset: "A", + summary_time: summary_time, + transaction_attestations: [ + %ReplicationAttestation{ + transaction_summary: tx_summary, + confirmations: + [node1, node2, node3, node4] + |> Enum.with_index(1) + |> Enum.map(fn {node, index} -> + node_index = Enum.find_index(storage_nodes, &(&1 == node)) + {_, pv} = Crypto.derive_keypair("node_seed", index) + {node_index, Crypto.sign(TransactionSummary.serialize(tx_summary), pv)} + end) + } + ] + } + + summary_v2 = %Summary{ + subset: "A", + summary_time: summary_time, + transaction_attestations: [ + %ReplicationAttestation{ + transaction_summary: tx_summary, + confirmations: + [node1, node2, node3, node4] + |> Enum.with_index(1) + |> Enum.map(fn {node, index} -> + node_index = Enum.find_index(storage_nodes, &(&1 == node)) + {_, pv} = Crypto.derive_keypair("node_seed", index) + {node_index, Crypto.sign(TransactionSummary.serialize(tx_summary), pv)} + end) + }, + %ReplicationAttestation{ + transaction_summary: %TransactionSummary{ + address: addr2, + timestamp: DateTime.utc_now(), + type: :transfer, + fee: 100_000_000 + }, + confirmations: + [node1, node2, node3, node4] + |> Enum.with_index(1) + |> Enum.map(fn {node, index} -> + node_index = Enum.find_index(storage_nodes, &(&1 == node)) + {_, pv} = Crypto.derive_keypair("node_seed", index) + {node_index, Crypto.sign(TransactionSummary.serialize(tx_summary), pv)} + end) + } + ] + } + + MockClient + |> stub(:send_message, fn + ^node1, %GetBeaconSummaries{}, _ -> + {:ok, %BeaconSummaryList{summaries: [summary_v1]}} + + ^node2, %GetBeaconSummaries{}, _ -> + {:ok, %BeaconSummaryList{summaries: [summary_v2]}} + + ^node3, %GetBeaconSummaries{}, _ -> + {:ok, %BeaconSummaryList{summaries: [summary_v1]}} + + ^node4, %GetBeaconSummaries{}, _ -> + {:ok, %BeaconSummaryList{summaries: [summary_v2]}} + end) + + [%SummaryAggregate{transaction_summaries: transaction_summaries}] = + BeaconChain.fetch_summary_aggregates([summary_time]) + + transaction_addresses = Enum.map(transaction_summaries, & &1.address) + + assert Enum.all?(transaction_addresses, &(&1 in [addr1, addr2])) + end + + test "should find other beacon summaries and aggregate node P2P views", %{ + summary_time: summary_time, + nodes: [node1, node2, node3, node4] + } do + summary_v1 = %Summary{ + subset: "A", + summary_time: summary_time, + node_availabilities: <<1::1, 1::1, 0::1>>, + end_of_node_synchronizations: [] + } + + summary_v2 = %Summary{ + subset: "A", + summary_time: summary_time, + node_availabilities: <<1::1, 1::1, 1::1>>, + end_of_node_synchronizations: [] + } + + summary_v3 = %Summary{ + subset: "A", + summary_time: summary_time, + node_availabilities: <<1::1, 0::1, 1::1>>, + end_of_node_synchronizations: [] + } + + summary_v4 = %Summary{ + subset: "A", + summary_time: summary_time, + node_availabilities: <<1::1, 1::1, 0::1>>, + end_of_node_synchronizations: [] + } + + MockClient + |> stub(:send_message, fn + ^node1, %GetBeaconSummaries{}, _ -> + {:ok, %BeaconSummaryList{summaries: [summary_v1]}} + + ^node2, %GetBeaconSummaries{}, _ -> + {:ok, %BeaconSummaryList{summaries: [summary_v2]}} + + ^node3, %GetBeaconSummaries{}, _ -> + {:ok, %BeaconSummaryList{summaries: [summary_v3]}} + + ^node4, %GetBeaconSummaries{}, _ -> + {:ok, %BeaconSummaryList{summaries: [summary_v4]}} + end) + + assert [ + %SummaryAggregate{ + p2p_availabilities: %{"A" => %{node_availabilities: <<1::1, 1::1, 1::1>>}} + } + ] = BeaconChain.fetch_summary_aggregates([summary_time]) + end + + test "should find other beacon summaries and aggregate node P2P avg availabilities", %{ + summary_time: summary_time, + nodes: [node1, node2, node3, node4] + } do + summary_v1 = %Summary{ + subset: "A", + summary_time: summary_time, + node_average_availabilities: [1.0, 0.9, 1.0, 1.0] + } + + summary_v2 = %Summary{ + subset: "A", + summary_time: summary_time, + node_average_availabilities: [0.90, 0.9, 1.0, 1.0] + } + + summary_v3 = %Summary{ + subset: "A", + summary_time: summary_time, + node_average_availabilities: [0.8, 0.9, 0.7, 1.0] + } + + summary_v4 = %Summary{ + subset: "A", + summary_time: summary_time, + node_availabilities: <<1::1, 1::1, 0::1>>, + node_average_availabilities: [1, 0.5, 1, 0.4] + } + + MockClient + |> stub(:send_message, fn + ^node1, %GetBeaconSummaries{}, _ -> + {:ok, %BeaconSummaryList{summaries: [summary_v1]}} + + ^node2, %GetBeaconSummaries{}, _ -> + {:ok, %BeaconSummaryList{summaries: [summary_v2]}} + + ^node3, %GetBeaconSummaries{}, _ -> + {:ok, %BeaconSummaryList{summaries: [summary_v3]}} + + ^node4, %GetBeaconSummaries{}, _ -> + {:ok, %BeaconSummaryList{summaries: [summary_v4]}} + end) + + assert [ + %SummaryAggregate{ + p2p_availabilities: %{ + "A" => %{node_average_availabilities: [0.925, 0.8, 0.925, 0.85]} + } + } + ] = BeaconChain.fetch_summary_aggregates([summary_time]) + end + end end diff --git a/test/archethic/self_repair/sync/beacon_summary_handler_test.exs b/test/archethic/self_repair/sync/beacon_summary_handler_test.exs deleted file mode 100644 index c1bc96fb0..000000000 --- a/test/archethic/self_repair/sync/beacon_summary_handler_test.exs +++ /dev/null @@ -1,486 +0,0 @@ -defmodule Archethic.SelfRepair.Sync.BeaconSummaryHandlerTest do - use ArchethicCase, async: false - - alias Archethic.BeaconChain.ReplicationAttestation - alias Archethic.BeaconChain.SlotTimer, as: BeaconSlotTimer - alias Archethic.BeaconChain.Summary, as: BeaconSummary - alias Archethic.BeaconChain.SummaryTimer, as: BeaconSummaryTimer - - alias Archethic.Crypto - - alias Archethic.Election - - alias Archethic.P2P - alias Archethic.P2P.Message.GetBeaconSummary - alias Archethic.P2P.Message.GetTransaction - alias Archethic.P2P.Message.GetTransactionChain - alias Archethic.P2P.Message.GetTransactionInputs - alias Archethic.P2P.Message.GetUnspentOutputs - alias Archethic.P2P.Message.NotFound - alias Archethic.P2P.Message.TransactionInputList - alias Archethic.P2P.Message.TransactionList - alias Archethic.P2P.Message.UnspentOutputList - alias Archethic.P2P.Node - - alias Archethic.SharedSecrets.MemTables.NetworkLookup - - alias Archethic.SelfRepair.Sync.BeaconSummaryHandler - alias Archethic.SelfRepair.Sync.BeaconSummaryAggregate - - alias Archethic.TransactionFactory - - alias Archethic.TransactionChain.TransactionInput - alias Archethic.TransactionChain.TransactionSummary - - import Mox - - setup do - P2P.add_and_connect_node(%Node{ - ip: {127, 0, 0, 1}, - port: 3000, - first_public_key: Crypto.first_node_public_key(), - last_public_key: Crypto.first_node_public_key(), - network_patch: "AAA", - geo_patch: "AAA", - reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>, - origin_public_key: Crypto.origin_node_public_key() - }) - - Crypto.generate_deterministic_keypair("daily_nonce_seed") - |> elem(0) - |> NetworkLookup.set_daily_nonce_public_key(DateTime.utc_now() |> DateTime.add(-10)) - - Archethic.SelfRepair.Scheduler.start_link([interval: "0 0 0 * * *"], []) - - :ok - end - - describe "get_full_beacon_summary/3" do - setup do - summary_time = ~U[2021-01-22 16:12:58Z] - - node_keypair1 = Crypto.derive_keypair("node_seed", 1) - node_keypair2 = Crypto.derive_keypair("node_seed", 2) - node_keypair3 = Crypto.derive_keypair("node_seed", 3) - node_keypair4 = Crypto.derive_keypair("node_seed", 4) - - node1 = %Node{ - ip: {127, 0, 0, 1}, - port: 3000, - first_public_key: elem(node_keypair1, 0), - last_public_key: elem(node_keypair1, 0), - network_patch: "AAA", - geo_patch: "AAA", - available?: true, - authorization_date: summary_time |> DateTime.add(-10), - authorized?: true, - reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> - } - - node2 = %Node{ - ip: {127, 0, 0, 1}, - port: 3000, - first_public_key: elem(node_keypair2, 0), - last_public_key: elem(node_keypair2, 0), - network_patch: "AAA", - geo_patch: "AAA", - available?: true, - authorization_date: summary_time |> DateTime.add(-10), - authorized?: true, - reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> - } - - node3 = %Node{ - ip: {127, 0, 0, 1}, - port: 3000, - first_public_key: elem(node_keypair3, 0), - last_public_key: elem(node_keypair3, 0), - network_patch: "AAA", - geo_patch: "AAA", - available?: true, - authorization_date: summary_time |> DateTime.add(-10), - authorized?: true, - reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> - } - - node4 = %Node{ - ip: {127, 0, 0, 1}, - port: 3000, - first_public_key: elem(node_keypair4, 0), - last_public_key: elem(node_keypair4, 0), - network_patch: "AAA", - geo_patch: "AAA", - available?: true, - authorization_date: summary_time |> DateTime.add(-10), - authorized?: true, - reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> - } - - P2P.add_and_connect_node(node1) - P2P.add_and_connect_node(node2) - P2P.add_and_connect_node(node3) - P2P.add_and_connect_node(node4) - - P2P.add_and_connect_node(%Node{ - first_public_key: Crypto.last_node_public_key(), - network_patch: "AAA", - available?: false - }) - - {:ok, %{summary_time: summary_time, nodes: [node1, node2, node3, node4]}} - end - - test "should download the beacon summary", %{ - summary_time: summary_time, - nodes: [node1, node2, node3, node4] - } do - addr1 = <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> - - beacon_summary_address_a = Crypto.derive_beacon_chain_address("A", summary_time, true) - - tx_summary = %TransactionSummary{ - address: addr1, - timestamp: DateTime.utc_now(), - type: :transfer, - fee: 100_000_000 - } - - storage_nodes = - Election.chain_storage_nodes_with_type(addr1, :transfer, P2P.available_nodes()) - - beacon_summary = %BeaconSummary{ - subset: "A", - summary_time: summary_time, - transaction_attestations: [ - %ReplicationAttestation{ - transaction_summary: tx_summary, - confirmations: - [node1, node2, node3, node4] - |> Enum.with_index(1) - |> Enum.map(fn {node, index} -> - node_index = Enum.find_index(storage_nodes, &(&1 == node)) - {_, pv} = Crypto.derive_keypair("node_seed", index) - {node_index, Crypto.sign(TransactionSummary.serialize(tx_summary), pv)} - end) - } - ] - } - - MockClient - |> stub(:send_message, fn - _, %GetBeaconSummary{address: ^beacon_summary_address_a}, _ -> - {:ok, beacon_summary} - end) - - %BeaconSummary{transaction_attestations: transaction_attestations} = - BeaconSummaryHandler.get_full_beacon_summary(summary_time, "A", [ - node1, - node2, - node3, - node4 - ]) - - assert [addr1] == Enum.map(transaction_attestations, & &1.transaction_summary.address) - end - - test "should find other beacon summaries and aggregate missing summaries", %{ - summary_time: summary_time, - nodes: [node1, node2, _, _] - } do - addr1 = <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> - addr2 = <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> - - summary_v1 = %BeaconSummary{ - subset: "A", - summary_time: summary_time, - transaction_attestations: [ - %ReplicationAttestation{ - transaction_summary: %TransactionSummary{ - address: addr1, - timestamp: DateTime.utc_now(), - type: :transfer, - fee: 100_000_000 - } - } - ] - } - - summary_v2 = %BeaconSummary{ - subset: "A", - summary_time: summary_time, - transaction_attestations: [ - %ReplicationAttestation{ - transaction_summary: %TransactionSummary{ - address: addr1, - timestamp: DateTime.utc_now(), - type: :transfer, - fee: 100_000_000 - } - }, - %ReplicationAttestation{ - transaction_summary: %TransactionSummary{ - address: addr2, - timestamp: DateTime.utc_now(), - type: :transfer, - fee: 100_000_000 - } - } - ] - } - - MockClient - |> stub(:send_message, fn - ^node1, %GetBeaconSummary{}, _ -> - {:ok, summary_v1} - - ^node2, %GetBeaconSummary{}, _ -> - {:ok, summary_v2} - end) - - %BeaconSummary{transaction_attestations: transaction_attestations} = - BeaconSummaryHandler.get_full_beacon_summary(summary_time, "A", [node1, node2]) - - transaction_addresses = Enum.map(transaction_attestations, & &1.transaction_summary.address) - - assert Enum.all?(transaction_addresses, &(&1 in [addr1, addr2])) - end - - test "should find other beacon summaries and aggregate node P2P views", %{ - summary_time: summary_time, - nodes: [node1, node2, node3, node4] - } do - summary_v1 = %BeaconSummary{ - subset: "A", - summary_time: summary_time, - node_availabilities: <<1::1, 1::1, 0::1>>, - end_of_node_synchronizations: [] - } - - summary_v2 = %BeaconSummary{ - subset: "A", - summary_time: summary_time, - node_availabilities: <<1::1, 1::1, 1::1>>, - end_of_node_synchronizations: [] - } - - summary_v3 = %BeaconSummary{ - subset: "A", - summary_time: summary_time, - node_availabilities: <<1::1, 0::1, 1::1>>, - end_of_node_synchronizations: [] - } - - summary_v4 = %BeaconSummary{ - subset: "A", - summary_time: summary_time, - node_availabilities: <<1::1, 1::1, 0::1>>, - end_of_node_synchronizations: [] - } - - MockClient - |> stub(:send_message, fn - ^node1, %GetBeaconSummary{}, _ -> - {:ok, summary_v1} - - ^node2, %GetBeaconSummary{}, _ -> - {:ok, summary_v2} - - ^node3, %GetBeaconSummary{}, _ -> - {:ok, summary_v3} - - ^node4, %GetBeaconSummary{}, _ -> - {:ok, summary_v4} - end) - - assert %BeaconSummary{node_availabilities: <<1::1, 1::1, 1::1>>} = - BeaconSummaryHandler.get_full_beacon_summary(summary_time, "A", [ - node1, - node2, - node3, - node4 - ]) - end - - test "should find other beacon summaries and aggregate node P2P avg availabilities", %{ - summary_time: summary_time, - nodes: [node1, node2, node3, node4] - } do - summary_v1 = %BeaconSummary{ - subset: "A", - summary_time: summary_time, - node_average_availabilities: [1.0, 0.9, 1.0, 1.0] - } - - summary_v2 = %BeaconSummary{ - subset: "A", - summary_time: summary_time, - node_average_availabilities: [0.90, 0.9, 1.0, 1.0] - } - - summary_v3 = %BeaconSummary{ - subset: "A", - summary_time: summary_time, - node_average_availabilities: [0.8, 0.9, 0.7, 1.0] - } - - summary_v4 = %BeaconSummary{ - subset: "A", - summary_time: summary_time, - node_availabilities: <<1::1, 1::1, 0::1>>, - node_average_availabilities: [1, 0.5, 1, 0.4] - } - - MockClient - |> stub(:send_message, fn - ^node1, %GetBeaconSummary{}, _ -> - {:ok, summary_v1} - - ^node2, %GetBeaconSummary{}, _ -> - {:ok, summary_v2} - - ^node3, %GetBeaconSummary{}, _ -> - {:ok, summary_v3} - - ^node4, %GetBeaconSummary{}, _ -> - {:ok, summary_v4} - end) - - assert %BeaconSummary{node_average_availabilities: [0.925, 0.8, 0.925, 0.85]} = - BeaconSummaryHandler.get_full_beacon_summary(summary_time, "A", [ - node1, - node2, - node3, - node4 - ]) - end - end - - describe "process_summary_aggregate/2" do - setup do - start_supervised!({BeaconSlotTimer, [interval: "* * * * * *"]}) - start_supervised!({BeaconSummaryTimer, [interval: "0 * * * * *"]}) - :ok - end - - test "should synchronize transactions" do - inputs = [ - %TransactionInput{ - from: "@Alice2", - amount: 1_000_000_000, - type: :UCO, - timestamp: DateTime.utc_now() - } - ] - - create_p2p_context() - - transfer_tx = - TransactionFactory.create_valid_transaction(inputs, - seed: "transfer_seed" - ) - - inputs = [ - %TransactionInput{ - from: "@Alice2", - amount: 1_000_000_000, - type: :UCO, - timestamp: DateTime.utc_now() - } - ] - - tx_address = transfer_tx.address - - me = self() - - MockDB - |> stub(:write_transaction, fn ^transfer_tx -> - send(me, :transaction_stored) - :ok - end) - - MockClient - |> stub(:send_message, fn - _, %GetTransaction{address: ^tx_address}, _ -> - {:ok, transfer_tx} - - _, %GetTransaction{address: _}, _ -> - {:ok, %NotFound{}} - - _, %GetTransactionChain{}, _ -> - {:ok, %TransactionList{transactions: []}} - - _, %GetTransactionInputs{address: _}, _ -> - {:ok, %TransactionInputList{inputs: inputs}} - - _, %GetUnspentOutputs{}, _ -> - {:ok, %UnspentOutputList{unspent_outputs: inputs}} - end) - - MockDB - |> stub(:register_tps, fn _, _, _ -> - :ok - end) - - assert :ok = - BeaconSummaryHandler.process_summary_aggregate( - %BeaconSummaryAggregate{ - summary_time: DateTime.utc_now(), - transaction_summaries: [ - %TransactionSummary{ - address: tx_address, - type: :transfer, - timestamp: DateTime.utc_now() - } - ] - }, - "AAA" - ) - - assert_received :transaction_stored - end - end - - defp create_p2p_context do - welcome_node = %Node{ - first_public_key: "key1", - last_public_key: "key1", - available?: true, - geo_patch: "BBB", - network_patch: "BBB", - reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>, - enrollment_date: DateTime.utc_now() - } - - coordinator_node = %Node{ - first_public_key: Crypto.first_node_public_key(), - last_public_key: Crypto.last_node_public_key(), - authorized?: true, - available?: true, - authorization_date: DateTime.utc_now() |> DateTime.add(-10), - geo_patch: "AAA", - network_patch: "AAA", - reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>, - enrollment_date: DateTime.utc_now() - } - - storage_nodes = [ - %Node{ - ip: {127, 0, 0, 1}, - port: 3000, - http_port: 4000, - first_public_key: "key3", - last_public_key: "key3", - available?: true, - geo_patch: "BBB", - network_patch: "BBB", - reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>, - authorization_date: DateTime.utc_now() - } - ] - - Enum.each(storage_nodes, &P2P.add_and_connect_node(&1)) - - P2P.add_and_connect_node(welcome_node) - P2P.add_and_connect_node(coordinator_node) - end -end diff --git a/test/archethic/self_repair/sync/beacon_summary_handler/transaction_handler_test.exs b/test/archethic/self_repair/sync/transaction_handler_test.exs similarity index 96% rename from test/archethic/self_repair/sync/beacon_summary_handler/transaction_handler_test.exs rename to test/archethic/self_repair/sync/transaction_handler_test.exs index 7cb41087f..f61c0270e 100644 --- a/test/archethic/self_repair/sync/beacon_summary_handler/transaction_handler_test.exs +++ b/test/archethic/self_repair/sync/transaction_handler_test.exs @@ -1,4 +1,4 @@ -defmodule Archethic.SelfRepair.Sync.BeaconSummaryHandler.TransactionHandlerTest do +defmodule Archethic.SelfRepair.Sync.TransactionHandlerTest do use ArchethicCase alias Archethic.BeaconChain @@ -18,7 +18,7 @@ defmodule Archethic.SelfRepair.Sync.BeaconSummaryHandler.TransactionHandlerTest alias Archethic.P2P.Message.UnspentOutputList alias Archethic.P2P.Node - alias Archethic.SelfRepair.Sync.BeaconSummaryHandler.TransactionHandler + alias Archethic.SelfRepair.Sync.TransactionHandler alias Archethic.SharedSecrets.MemTables.NetworkLookup alias Archethic.TransactionFactory diff --git a/test/archethic/self_repair/sync_test.exs b/test/archethic/self_repair/sync_test.exs index 9557f6bb9..6917301de 100644 --- a/test/archethic/self_repair/sync_test.exs +++ b/test/archethic/self_repair/sync_test.exs @@ -4,6 +4,7 @@ defmodule Archethic.SelfRepair.SyncTest do alias Archethic.BeaconChain.ReplicationAttestation alias Archethic.BeaconChain.SlotTimer, as: BeaconSlotTimer alias Archethic.BeaconChain.Summary, as: BeaconSummary + alias Archethic.BeaconChain.SummaryAggregate alias Archethic.BeaconChain.SummaryTimer, as: BeaconSummaryTimer alias Archethic.Crypto @@ -11,7 +12,8 @@ defmodule Archethic.SelfRepair.SyncTest do alias Archethic.Election alias Archethic.P2P - alias Archethic.P2P.Message.GetBeaconSummary + alias Archethic.P2P.Message.BeaconSummaryList + alias Archethic.P2P.Message.GetBeaconSummaries alias Archethic.P2P.Message.GetTransaction alias Archethic.P2P.Message.GetTransactionChain alias Archethic.P2P.Message.GetTransactionInputs @@ -227,8 +229,8 @@ defmodule Archethic.SelfRepair.SyncTest do MockClient |> stub(:send_message, fn - _, %GetBeaconSummary{}, _ -> - {:ok, summary} + _, %GetBeaconSummaries{}, _ -> + {:ok, %BeaconSummaryList{summaries: [summary]}} _, %GetTransaction{address: ^tx_address}, _ -> {:ok, tx} @@ -258,4 +260,133 @@ defmodule Archethic.SelfRepair.SyncTest do assert_received :storage end end + + describe "process_summary_aggregate/2" do + setup do + start_supervised!({BeaconSlotTimer, [interval: "* * * * * *"]}) + start_supervised!({BeaconSummaryTimer, [interval: "0 * * * * *"]}) + :ok + end + + test "should synchronize transactions" do + inputs = [ + %TransactionInput{ + from: "@Alice2", + amount: 1_000_000_000, + type: :UCO, + timestamp: DateTime.utc_now() + } + ] + + create_p2p_context() + + transfer_tx = + TransactionFactory.create_valid_transaction(inputs, + seed: "transfer_seed" + ) + + inputs = [ + %TransactionInput{ + from: "@Alice2", + amount: 1_000_000_000, + type: :UCO, + timestamp: DateTime.utc_now() + } + ] + + tx_address = transfer_tx.address + + me = self() + + MockDB + |> stub(:write_transaction, fn ^transfer_tx -> + send(me, :transaction_stored) + :ok + end) + + MockClient + |> stub(:send_message, fn + _, %GetTransaction{address: ^tx_address}, _ -> + {:ok, transfer_tx} + + _, %GetTransaction{address: _}, _ -> + {:ok, %NotFound{}} + + _, %GetTransactionChain{}, _ -> + {:ok, %TransactionList{transactions: []}} + + _, %GetTransactionInputs{address: _}, _ -> + {:ok, %TransactionInputList{inputs: inputs}} + + _, %GetUnspentOutputs{}, _ -> + {:ok, %UnspentOutputList{unspent_outputs: inputs}} + end) + + MockDB + |> stub(:register_tps, fn _, _, _ -> + :ok + end) + + assert :ok = + Sync.process_summary_aggregate( + %SummaryAggregate{ + summary_time: DateTime.utc_now(), + transaction_summaries: [ + %TransactionSummary{ + address: tx_address, + type: :transfer, + timestamp: DateTime.utc_now() + } + ] + }, + "AAA" + ) + + assert_received :transaction_stored + end + end + + defp create_p2p_context do + welcome_node = %Node{ + first_public_key: "key1", + last_public_key: "key1", + available?: true, + geo_patch: "BBB", + network_patch: "BBB", + reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>, + enrollment_date: DateTime.utc_now() + } + + coordinator_node = %Node{ + first_public_key: Crypto.first_node_public_key(), + last_public_key: Crypto.last_node_public_key(), + authorized?: true, + available?: true, + authorization_date: DateTime.utc_now() |> DateTime.add(-10), + geo_patch: "AAA", + network_patch: "AAA", + reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>, + enrollment_date: DateTime.utc_now() + } + + storage_nodes = [ + %Node{ + ip: {127, 0, 0, 1}, + port: 3000, + http_port: 4000, + first_public_key: "key3", + last_public_key: "key3", + available?: true, + geo_patch: "BBB", + network_patch: "BBB", + reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>, + authorization_date: DateTime.utc_now() + } + ] + + Enum.each(storage_nodes, &P2P.add_and_connect_node(&1)) + + P2P.add_and_connect_node(welcome_node) + P2P.add_and_connect_node(coordinator_node) + end end