diff --git a/lib/archethic/p2p.ex b/lib/archethic/p2p.ex index 1b02ba741..e6fe4c14c 100644 --- a/lib/archethic/p2p.ex +++ b/lib/archethic/p2p.ex @@ -180,7 +180,10 @@ defmodule Archethic.P2P do case nodes do [] -> # Only happen during bootstrap - get_first_enrolled_node() + case get_first_enrolled_node() do + nil -> [] + node -> [node] + end nodes -> nodes @@ -211,18 +214,25 @@ defmodule Archethic.P2P do case nodes do [] -> # Only happen for init transactions so we take the first enrolled node - get_first_enrolled_node() + case get_first_enrolled_node() do + nil -> [] + node -> [node] + end nodes -> nodes end end - defp get_first_enrolled_node() do + @doc """ + Return the first enrolled node + """ + @spec get_first_enrolled_node() :: Node.t() | nil + def get_first_enrolled_node() do list_nodes() |> Enum.reject(&(&1.enrollment_date == nil)) |> Enum.sort_by(& &1.enrollment_date, {:asc, DateTime}) - |> Enum.take(1) + |> List.first() end @doc """ diff --git a/lib/archethic/p2p/message.ex b/lib/archethic/p2p/message.ex index e3005a70e..140a309bf 100644 --- a/lib/archethic/p2p/message.ex +++ b/lib/archethic/p2p/message.ex @@ -1873,10 +1873,16 @@ defmodule Archethic.P2P.Message do ) do # Ensure all addresses are expected to be replicated nodes = P2P.authorized_and_available_nodes() - addresses = [storage_address | io_addresses] + + addresses = + if storage_address != nil, do: [storage_address | io_addresses], else: io_addresses + public_key = Crypto.first_node_public_key() - if Enum.all?(addresses, &(Election.storage_nodes(&1, nodes) |> Enum.member?(public_key))) do + if Enum.all?( + addresses, + &(Election.storage_nodes(&1, nodes) |> Utils.key_in_node_list?(public_key)) + ) do case SelfRepair.repair_in_progress?(first_address) do false -> SelfRepair.start_worker( diff --git a/lib/archethic/self_repair/notifier.ex b/lib/archethic/self_repair/notifier.ex index ba9abe340..886727aef 100644 --- a/lib/archethic/self_repair/notifier.ex +++ b/lib/archethic/self_repair/notifier.ex @@ -19,12 +19,15 @@ defmodule Archethic.SelfRepair.Notifier do """ alias Archethic.{ + BeaconChain, Crypto, Election, P2P, + P2P.Node, P2P.Message.ShardRepair, TransactionChain, - TransactionChain.Transaction + TransactionChain.Transaction, + Utils } alias Archethic.TransactionChain.Transaction.{ @@ -59,7 +62,12 @@ defmodule Archethic.SelfRepair.Notifier do prev_available_nodes = Keyword.fetch!(data, :prev_available_nodes) new_available_nodes = Keyword.fetch!(data, :new_available_nodes) + Logger.info( + "Start Notifier due to a topology change #{inspect(Enum.map(unavailable_nodes, &Base.encode16(&1)))}" + ) + repair_transactions(unavailable_nodes, prev_available_nodes, new_available_nodes) + repair_summaries_aggregate(prev_available_nodes, new_available_nodes) {:stop, :normal, data} end @@ -70,10 +78,6 @@ defmodule Archethic.SelfRepair.Notifier do """ @spec repair_transactions(list(Crypto.key()), list(Node.t()), list(Node.t())) :: :ok def repair_transactions(unavailable_nodes, prev_available_nodes, new_available_nodes) do - Logger.debug( - "Trying to repair transactions due to a topology change #{inspect(Enum.map(unavailable_nodes, &Base.encode16(&1)))}" - ) - # We fetch all the transactions existing and check if the disconnected nodes were in storage nodes TransactionChain.stream_first_addresses() |> Stream.reject(&network_chain?(&1)) @@ -231,7 +235,8 @@ defmodule Archethic.SelfRepair.Notifier do acc, fn {node_first_public_key, %{last_address: last_address, io_addresses: io_addresses}} -> Logger.info( - "Send Shard Repair message to #{Base.encode16(node_first_public_key)} with storage_address #{Base.encode16(last_address)}, " <> + "Send Shard Repair message to #{Base.encode16(node_first_public_key)}" <> + "with storage_address #{if last_address, do: Base.encode16(last_address), else: nil}, " <> "io_addresses #{inspect(Enum.map(io_addresses, &Base.encode16(&1)))}", address: Base.encode16(first_address) ) @@ -247,4 +252,59 @@ defmodule Archethic.SelfRepair.Notifier do ) |> Stream.run() end + + @doc """ + For each beacon aggregate, calculate the new election and store it if the node needs to + """ + @spec repair_summaries_aggregate(list(Node.t()), list(Node.t())) :: :ok + def repair_summaries_aggregate(prev_available_nodes, new_available_nodes) do + %Node{enrollment_date: first_enrollment_date} = P2P.get_first_enrolled_node() + + first_enrollment_date + |> BeaconChain.next_summary_dates() + |> Stream.filter(&download?(&1, new_available_nodes)) + |> Stream.chunk_every(20) + |> Stream.each(fn summary_times -> + Task.Supervisor.async_stream_nolink( + Archethic.TaskSupervisor, + summary_times, + &download_and_store_summary(&1, prev_available_nodes), + ordered: false, + on_timeout: :kill_task + ) + |> Stream.run() + end) + |> Stream.run() + end + + defp download?(summary_time, new_available_nodes) do + in_new_election? = + summary_time + |> Crypto.derive_beacon_aggregate_address() + |> Election.chain_storage_nodes(new_available_nodes) + |> Utils.key_in_node_list?(Crypto.first_node_public_key()) + + if in_new_election? do + case BeaconChain.get_summaries_aggregate(summary_time) do + {:ok, _} -> false + {:error, _} -> true + end + else + false + end + end + + defp download_and_store_summary(summary_time, prev_available_nodes) do + case BeaconChain.fetch_summaries_aggregate(summary_time, prev_available_nodes) do + {:ok, aggregate} -> + Logger.debug("Notifier store beacon aggregate for #{summary_time}") + BeaconChain.write_summaries_aggregate(aggregate) + + error -> + Logger.warning( + "Notifier cannot fetch summary aggregate for date #{summary_time} " <> + "because of #{inspect(error)}" + ) + end + end end diff --git a/lib/archethic/self_repair/repair_worker.ex b/lib/archethic/self_repair/repair_worker.ex index bfc7531e2..d2113bd33 100644 --- a/lib/archethic/self_repair/repair_worker.ex +++ b/lib/archethic/self_repair/repair_worker.ex @@ -28,7 +28,7 @@ defmodule Archethic.SelfRepair.RepairWorker do Registry.register(RepairRegistry, first_address, []) Logger.info( - "Notifier Repair Worker start with storage_address #{Base.encode16(storage_address)}, " <> + "Notifier Repair Worker start with storage_address #{if storage_address, do: Base.encode16(storage_address), else: nil}, " <> "io_addresses #{inspect(Enum.map(io_addresses, &Base.encode16(&1)))}", address: Base.encode16(first_address) ) diff --git a/lib/archethic/self_repair/sync.ex b/lib/archethic/self_repair/sync.ex index 00f3ffde6..bdf8738a7 100644 --- a/lib/archethic/self_repair/sync.ex +++ b/lib/archethic/self_repair/sync.ex @@ -268,9 +268,9 @@ defmodule Archethic.SelfRepair.Sync do |> Enum.map(&update_availabilities(&1, availability_update)) |> DB.register_p2p_summary() - if :persistent_term.get(:archethic_up, nil) == :up do - new_available_nodes = P2P.authorized_and_available_nodes(availability_update) + new_available_nodes = P2P.authorized_and_available_nodes(availability_update) + if :persistent_term.get(:archethic_up, nil) == :up do SelfRepair.start_notifier( previous_available_nodes, new_available_nodes, @@ -280,7 +280,7 @@ defmodule Archethic.SelfRepair.Sync do update_statistics(summary_time, transaction_summaries) - store_aggregate(aggregate) + store_aggregate(aggregate, new_available_nodes) end defp synchronize_transactions([], _), do: :ok @@ -387,9 +387,11 @@ defmodule Archethic.SelfRepair.Sync do PubSub.notify_new_tps(tps, nb_transactions) end - defp store_aggregate(aggregate = %SummaryAggregate{summary_time: summary_time}) do - node_list = - [P2P.get_node_info() | P2P.authorized_and_available_nodes()] |> P2P.distinct_nodes() + defp store_aggregate( + aggregate = %SummaryAggregate{summary_time: summary_time}, + new_available_nodes + ) do + node_list = [P2P.get_node_info() | new_available_nodes] |> P2P.distinct_nodes() should_store? = summary_time diff --git a/test/archethic/self_repair/notifier_test.exs b/test/archethic/self_repair/notifier_test.exs index 3d72e5625..9c0b65e3d 100644 --- a/test/archethic/self_repair/notifier_test.exs +++ b/test/archethic/self_repair/notifier_test.exs @@ -4,11 +4,15 @@ defmodule Archethic.SelfRepair.NotifierTest do import Mox + alias Archethic.BeaconChain.SummaryAggregate + alias Archethic.BeaconChain.SummaryTimer + alias Archethic.Crypto alias Archethic.Election alias Archethic.P2P + alias Archethic.P2P.Message.GetBeaconSummariesAggregate alias Archethic.P2P.Message.Ok alias Archethic.P2P.Message.ShardRepair alias Archethic.P2P.Node @@ -92,7 +96,7 @@ defmodule Archethic.SelfRepair.NotifierTest do assert ^expected = Notifier.map_last_addresses_for_node(tab) end - test "repair_transactions/1 should send message to new storage nodes" do + test "repair_transactions/3 should send message to new storage nodes" do node = %Node{ first_public_key: Crypto.first_node_public_key(), last_public_key: Crypto.last_node_public_key(), @@ -181,4 +185,75 @@ defmodule Archethic.SelfRepair.NotifierTest do assert_receive :new_node refute_receive :new_node, 200 end + + test "repair_summaries_aggregate/2 should store beacon aggregate" do + enrollment_date = DateTime.utc_now() |> DateTime.add(-10, :minute) + + node = %Node{ + first_public_key: Crypto.first_node_public_key(), + last_public_key: Crypto.last_node_public_key(), + geo_patch: "AAA", + network_patch: "AAA", + authorized?: true, + authorization_date: DateTime.utc_now() |> DateTime.add(-11, :minute), + available?: true, + enrollment_date: enrollment_date, + availability_history: <<1::1>> + } + + P2P.add_and_connect_node(node) + + nodes = + Enum.map(1..9, fn nb -> + %Node{ + first_public_key: "node#{nb}", + last_public_key: "node#{nb}", + geo_patch: "AAA", + network_patch: "AAA", + authorized?: true, + authorization_date: DateTime.utc_now(), + available?: true, + availability_history: <<1::1>> + } + end) + + nodes = [node | nodes] + + start_supervised!({SummaryTimer, interval: "0 * * * *"}) + + [first_date | rest] = SummaryTimer.next_summaries(enrollment_date) |> Enum.to_list() + random_date = Enum.random(rest) + + me = self() + + MockDB + |> stub(:get_beacon_summaries_aggregate, fn + summary_time when summary_time in [first_date, random_date] -> + {:error, :not_exists} + + summary_time -> + {:ok, %SummaryAggregate{summary_time: summary_time}} + end) + |> expect(:write_beacon_summaries_aggregate, 2, fn + %SummaryAggregate{summary_time: summary_time} when summary_time == first_date -> + send(me, :write_first_date) + + %SummaryAggregate{summary_time: summary_time} when summary_time == random_date -> + send(me, :write_random_date) + + _ -> + send(me, :unexpected) + end) + + MockClient + |> stub(:send_message, fn _, %GetBeaconSummariesAggregate{date: summary_time}, _ -> + {:ok, %SummaryAggregate{summary_time: summary_time}} + end) + + Notifier.repair_summaries_aggregate(nodes, nodes) + + assert_receive :write_first_date + assert_receive :write_random_date + refute_receive :unexpected + end end