From 25ed195b8776849b3dcdbbb9b497239c41f4dadc Mon Sep 17 00:00:00 2001 From: Neylix Date: Mon, 12 Jun 2023 11:19:34 +0200 Subject: [PATCH 1/3] Refactor post_bootstrap function --- lib/archethic/bootstrap.ex | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/lib/archethic/bootstrap.ex b/lib/archethic/bootstrap.ex index 0a4924340..a6d1993c0 100644 --- a/lib/archethic/bootstrap.ex +++ b/lib/archethic/bootstrap.ex @@ -117,10 +117,10 @@ defmodule Archethic.Bootstrap do network_patch, reward_address ) - else - post_bootstrap() end + post_bootstrap() + Logger.info("Bootstrapping finished!") end @@ -170,7 +170,6 @@ defmodule Archethic.Bootstrap do Sync.initialize_network(tx) - post_bootstrap() SelfRepair.put_last_sync_date(DateTime.utc_now()) Crypto.first_node_public_key() == Crypto.previous_node_public_key() -> @@ -186,8 +185,6 @@ defmodule Archethic.Bootstrap do configured_reward_address ) - post_bootstrap() - true -> Logger.info("Update node chain...") @@ -208,8 +205,6 @@ defmodule Archethic.Bootstrap do bootstrapping_seeds, last_reward_address ) - - post_bootstrap() end end From 22465177908bd94131a0478aac530a071c367ece Mon Sep 17 00:00:00 2001 From: Neylix Date: Mon, 12 Jun 2023 11:48:28 +0200 Subject: [PATCH 2/3] Refactor bootstrap_sync function --- lib/archethic/self_repair.ex | 53 +++++++++++++----------------------- 1 file changed, 19 insertions(+), 34 deletions(-) diff --git a/lib/archethic/self_repair.ex b/lib/archethic/self_repair.ex index 231e9f16e..2907c4d3a 100755 --- a/lib/archethic/self_repair.ex +++ b/lib/archethic/self_repair.ex @@ -45,35 +45,10 @@ defmodule Archethic.SelfRepair do @doc """ Start the bootstrap's synchronization process using the last synchronization date """ - @spec bootstrap_sync(last_sync_date :: DateTime.t()) :: :ok - def bootstrap_sync(date = %DateTime{}) do - # Loading transactions can take a lot of time to be achieve and can overpass an epoch. - # So to avoid missing a beacon summary epoch, we save the starting date and update the last sync date with it - # at the end of loading (in case there is a crash during self repair). - - # Summary time after the the last synchronization date - summary_time = BeaconChain.next_summary_date(date) - - # Before the first summary date, synchronization is useless - # as no data have been aggregated - if DateTime.diff(DateTime.utc_now(), summary_time) >= 0 do - loaded_missed_transactions? = - :ok == - 0..@max_retry_count - |> Enum.reduce_while(:error, fn _, _ -> - try do - :ok = Sync.load_missed_transactions(date) - {:halt, :ok} - catch - error, message -> - Logger.error("Error during self repair #{error} #{message}") - {:cont, :error} - end - end) - - if loaded_missed_transactions? do - Logger.info("Bootstrap Sync succeded in loading missed transactions !") - + @spec bootstrap_sync(last_sync_date :: DateTime.t()) :: :ok | :error + def bootstrap_sync(last_sync_date = %DateTime{}) do + case sync_with_retry(last_sync_date) do + :ok -> # At the end of self repair, if a new beacon summary as been created # we run bootstrap_sync again until the last beacon summary is loaded last_sync_date = last_sync_date() @@ -87,16 +62,26 @@ defmodule Archethic.SelfRepair do _ -> :ok end - else + + :error -> Logger.error( "Bootstrap Sync failed to load missed transactions after max retry of #{@max_retry_count} !" ) + end + end - :error + defp sync_with_retry(last_sync_date) do + 0..@max_retry_count + |> Enum.reduce_while(:error, fn _, _ -> + try do + :ok = Sync.load_missed_transactions(last_sync_date) + {:halt, :ok} + catch + error, message -> + Logger.error("Error during self repair #{inspect(error)} #{inspect(message)}") + {:cont, :error} end - else - Logger.info("Synchronization skipped (before first summary date)") - end + end) end @doc """ From 8df39b454701d833cc9e509e469845bf0d1aadae Mon Sep 17 00:00:00 2001 From: Neylix Date: Tue, 13 Jun 2023 13:20:15 +0200 Subject: [PATCH 3/3] Do not load node in memtable during bootstrap --- lib/archethic/bootstrap.ex | 95 ++++---- lib/archethic/bootstrap/sync.ex | 46 ++-- .../bootstrap/transaction_handler.ex | 14 +- lib/archethic/p2p.ex | 37 ++-- lib/archethic/p2p/mem_table_loader.ex | 17 +- .../p2p/message/bootstrapping_nodes.ex | 21 +- .../p2p/message/get_bootstraping_nodes.ex | 3 +- lib/archethic/p2p/message/list_nodes.ex | 27 ++- lib/archethic/self_repair.ex | 13 +- lib/archethic/self_repair/network_chain.ex | 2 +- lib/archethic/self_repair/scheduler.ex | 21 +- lib/archethic/self_repair/sync.ex | 10 +- test/archethic/bootstrap/sync_test.exs | 60 +++--- .../bootstrap/transaction_handler_test.exs | 3 +- test/archethic/bootstrap_test.exs | 203 +----------------- test/archethic/p2p/messages_test.exs | 39 +++- test/archethic/self_repair/sync_test.exs | 11 +- 17 files changed, 241 insertions(+), 381 deletions(-) diff --git a/lib/archethic/bootstrap.ex b/lib/archethic/bootstrap.ex index a6d1993c0..702a7ca70 100644 --- a/lib/archethic/bootstrap.ex +++ b/lib/archethic/bootstrap.ex @@ -107,19 +107,31 @@ defmodule Archethic.Bootstrap do P2P.get_geo_patch(ip) end + node_first_public_key = Crypto.first_node_public_key() + + closest_bootstrapping_nodes = + case bootstrapping_seeds do + [%Node{first_public_key: ^node_first_public_key}] -> + bootstrapping_seeds + + nodes -> + Enum.each(nodes, &P2P.connect_node/1) + {:ok, closest_nodes} = Sync.get_closest_nodes_and_renew_seeds(nodes, network_patch) + closest_nodes + end + if should_bootstrap?(ip, port, http_port, transport, last_sync_date) do start_bootstrap( ip, port, http_port, transport, - bootstrapping_seeds, - network_patch, + closest_bootstrapping_nodes, reward_address ) end - post_bootstrap() + post_bootstrap(closest_bootstrapping_nodes) Logger.info("Bootstrapping finished!") end @@ -148,14 +160,13 @@ defmodule Archethic.Bootstrap do port, http_port, transport, - bootstrapping_seeds, - network_patch, + closest_bootstrapping_nodes, configured_reward_address ) do Logger.info("Bootstrapping starting") cond do - Sync.should_initialize_network?(bootstrapping_seeds) -> + Sync.should_initialize_network?(closest_bootstrapping_nodes) -> Logger.info("This node should initialize the network!!") Logger.debug("Create first node transaction") @@ -180,8 +191,7 @@ defmodule Archethic.Bootstrap do port, http_port, transport, - network_patch, - bootstrapping_seeds, + closest_bootstrapping_nodes, configured_reward_address ) @@ -201,21 +211,20 @@ defmodule Archethic.Bootstrap do port, http_port, transport, - network_patch, - bootstrapping_seeds, + closest_bootstrapping_nodes, last_reward_address ) end end - defp post_bootstrap() do + defp post_bootstrap(closest_bootstrapping_nodes) do last_sync_date = SelfRepair.last_sync_date() if SelfRepair.missed_sync?(last_sync_date) do Logger.info("Synchronization started") # Always load the current node list to have the current view for downloading transaction - :ok = Sync.load_node_list() - :ok = SelfRepair.bootstrap_sync(last_sync_date) + {:ok, current_nodes} = Sync.connect_current_node(closest_bootstrapping_nodes) + :ok = SelfRepair.bootstrap_sync(last_sync_date, current_nodes) Logger.info("Synchronization finished") end @@ -236,23 +245,14 @@ defmodule Archethic.Bootstrap do port, http_port, transport, - patch, - bootstrapping_seeds, + closest_bootstrapping_nodes, configured_reward_address ) do - Enum.each(bootstrapping_seeds, &P2P.add_and_connect_node/1) - - {:ok, closest_nodes} = Sync.get_closest_nodes_and_renew_seeds(bootstrapping_seeds, patch) - - closest_nodes = - closest_nodes - |> Enum.reject(&(&1.first_public_key == Crypto.first_node_public_key())) - # In case node had lose it's DB, we ask the network if the node chain already exists {:ok, length} = Crypto.first_node_public_key() |> Crypto.derive_address() - |> TransactionChain.fetch_size_remotely(closest_nodes) + |> TransactionChain.fetch_size_remotely(closest_bootstrapping_nodes) Crypto.set_node_key_index(length) @@ -260,10 +260,10 @@ defmodule Archethic.Bootstrap do if length > 0 do {:ok, last_address} = Crypto.derive_address(Crypto.first_node_public_key()) - |> TransactionChain.fetch_last_address_remotely(closest_nodes) + |> TransactionChain.fetch_last_address_remotely(closest_bootstrapping_nodes) {:ok, %Transaction{data: %TransactionData{content: content}}} = - TransactionChain.fetch_transaction_remotely(last_address, closest_nodes) + TransactionChain.fetch_transaction_remotely(last_address, closest_bootstrapping_nodes) {:ok, _ip, _p2p_port, _http_port, _transport, last_reward_address, _origin_public_key, _key_certificate} = Node.decode_transaction_content(content) @@ -276,41 +276,26 @@ defmodule Archethic.Bootstrap do tx = TransactionHandler.create_node_transaction(ip, port, http_port, transport, reward_address) - {:ok, validated_tx} = TransactionHandler.send_transaction(tx, closest_nodes) + {:ok, validated_tx} = TransactionHandler.send_transaction(tx, closest_bootstrapping_nodes) - :ok = Sync.load_storage_nonce(closest_nodes) + :ok = Sync.load_storage_nonce(closest_bootstrapping_nodes) - Replication.sync_transaction_chain(validated_tx, closest_nodes) + Replication.sync_transaction_chain(validated_tx, closest_bootstrapping_nodes) end - defp update_node(ip, port, http_port, transport, patch, bootstrapping_seeds, reward_address) do - case Enum.reject( - bootstrapping_seeds, - &(&1.first_public_key == Crypto.first_node_public_key()) - ) do - [] -> - Logger.warning("Not enough nodes in the network. No node update") - - _ -> - {:ok, closest_nodes} = Sync.get_closest_nodes_and_renew_seeds(bootstrapping_seeds, patch) - - closest_nodes = - closest_nodes - |> Enum.reject(&(&1.first_public_key == Crypto.first_node_public_key())) - - tx = - TransactionHandler.create_node_transaction( - ip, - port, - http_port, - transport, - reward_address - ) + defp update_node(ip, port, http_port, transport, closest_bootstrapping_nodes, reward_address) do + tx = + TransactionHandler.create_node_transaction( + ip, + port, + http_port, + transport, + reward_address + ) - {:ok, validated_tx} = TransactionHandler.send_transaction(tx, closest_nodes) + {:ok, validated_tx} = TransactionHandler.send_transaction(tx, closest_bootstrapping_nodes) - Replication.sync_transaction_chain(validated_tx, closest_nodes) - end + Replication.sync_transaction_chain(validated_tx, closest_bootstrapping_nodes) end @doc """ diff --git a/lib/archethic/bootstrap/sync.ex b/lib/archethic/bootstrap/sync.ex index a227dc6ab..d12f1e360 100644 --- a/lib/archethic/bootstrap/sync.ex +++ b/lib/archethic/bootstrap/sync.ex @@ -16,10 +16,10 @@ defmodule Archethic.Bootstrap.Sync do alias Archethic.P2P.Message.NotifyEndOfNodeSync alias Archethic.P2P.Node - alias Archethic.SelfRepair - alias Archethic.SharedSecrets + alias Archethic.TaskSupervisor + alias Archethic.TransactionChain alias Archethic.TransactionChain.Transaction @@ -133,15 +133,27 @@ defmodule Archethic.Bootstrap.Sync do @doc """ Fetch and load the nodes list """ - @spec load_node_list() :: :ok | {:error, :network_issue} - def load_node_list() do - case P2P.fetch_nodes_list() do + @spec connect_current_node(closest_nodes :: list(Node.t())) :: + {:ok, list(Node.t())} | {:error, :network_issue} + def connect_current_node(closest_nodes) do + case P2P.fetch_nodes_list(true, closest_nodes) do {:ok, nodes} -> - Enum.each(nodes, &P2P.add_and_connect_node/1) - # After loading all current nodes, we update the p2p view with the last one stored in DB - # to have a proper view for the next beacon summary to self repair - SelfRepair.last_sync_date() |> P2P.reload_last_view() + nodes = + Task.Supervisor.async_stream( + TaskSupervisor, + nodes, + fn node -> + P2P.connect_node(node) + # Wait connection time + Process.sleep(500) + %Node{node | availability_history: <<1::1>>} + end + ) + |> Stream.filter(&match?({:ok, _}, &1)) + |> Enum.map(fn {:ok, node} -> node end) + Logger.info("Node list refreshed") + {:ok, nodes} {:error, :network_issue} -> {:error, :network_issue} @@ -180,17 +192,27 @@ defmodule Archethic.Bootstrap.Sync do {:ok, list(Node.t())} | {:error, :network_issue} def get_closest_nodes_and_renew_seeds([node | rest], patch) do case P2P.send_message(node, %GetBootstrappingNodes{patch: patch}) do - {:ok, %BootstrappingNodes{closest_nodes: closest_nodes, new_seeds: new_seeds}} -> + {:ok, + %BootstrappingNodes{ + closest_nodes: closest_nodes, + new_seeds: new_seeds, + first_enrolled_node: first_enrolled_node + }} -> :ok = P2P.new_bootstrapping_seeds(new_seeds) + P2P.add_and_connect_node(first_enrolled_node) + Logger.info("First enrolled node added into P2P MemTable") + closest_nodes = (new_seeds ++ closest_nodes) |> P2P.distinct_nodes() + |> Enum.reject(&(&1.first_public_key == Crypto.first_node_public_key())) |> Task.async_stream(fn node -> - P2P.add_and_connect_node(node) + P2P.connect_node(node) # Wait for connection time Process.sleep(500) - P2P.get_node_info!(node.first_public_key) + # Set node locally available after connection + %Node{node | availability_history: <<1::1>>} end) |> Enum.filter(&match?({:ok, _}, &1)) |> Enum.map(fn {:ok, node} -> node end) diff --git a/lib/archethic/bootstrap/transaction_handler.ex b/lib/archethic/bootstrap/transaction_handler.ex index 4b693f506..70bb919ec 100644 --- a/lib/archethic/bootstrap/transaction_handler.ex +++ b/lib/archethic/bootstrap/transaction_handler.ex @@ -3,8 +3,6 @@ defmodule Archethic.Bootstrap.TransactionHandler do alias Archethic.Crypto - alias Archethic.Election - alias Archethic.P2P alias Archethic.P2P.Message.Ok alias Archethic.P2P.Message.NewTransaction @@ -32,7 +30,7 @@ defmodule Archethic.Bootstrap.TransactionHandler do end defp do_send_transaction( - [node | rest], + nodes = [node | rest], tx = %Transaction{address: address, type: type, data: transaction_data} ) do case P2P.send_message(node, %NewTransaction{ @@ -45,15 +43,7 @@ defmodule Archethic.Bootstrap.TransactionHandler do transaction_type: type ) - storage_nodes = - Election.chain_storage_nodes_with_type( - address, - type, - P2P.authorized_and_available_nodes() - ) - |> Enum.reject(&(&1.first_public_key == Crypto.first_node_public_key())) - - case Utils.await_confirmation(address, storage_nodes) do + case Utils.await_confirmation(address, nodes) do {:ok, validated_transaction = %Transaction{address: ^address, data: ^transaction_data}} -> {:ok, validated_transaction} diff --git a/lib/archethic/p2p.ex b/lib/archethic/p2p.ex index e1184be8b..192be7920 100644 --- a/lib/archethic/p2p.ex +++ b/lib/archethic/p2p.ex @@ -40,15 +40,19 @@ defmodule Archethic.P2P do def add_and_connect_node(node = %Node{first_public_key: first_public_key}) do :ok = MemTable.add_node(node) node = get_node_info!(first_public_key) - do_connect_node(node) + connect_node(node) end - defp do_connect_node(%Node{ - ip: ip, - port: port, - transport: transport, - first_public_key: first_public_key - }) do + @doc """ + Establish a connection with a node + """ + @spec connect_node(Node.t()) :: :ok + def connect_node(%Node{ + ip: ip, + port: port, + transport: transport, + first_public_key: first_public_key + }) do if first_public_key == Crypto.first_node_public_key() do :ok else @@ -57,12 +61,6 @@ defmodule Archethic.P2P do end end - @doc """ - Reload last P2P view from DB - """ - @spec reload_last_view(last_sync_date :: DateTime.t() | nil) :: :ok - defdelegate reload_last_view(last_sync_date), to: MemTableLoader, as: :load_p2p_view - @doc """ List the nodes registered. """ @@ -72,8 +70,9 @@ defmodule Archethic.P2P do @doc """ Fetch the list of nodes from close nodes """ - @spec fetch_nodes_list() :: {:ok, list(Node.t())} | {:error, :network_issue} - def fetch_nodes_list() do + @spec fetch_nodes_list(authorized_and_available? :: boolean(), node_list :: list(Node.t())) :: + {:ok, list(Node.t())} | {:error, :network_issue} + def fetch_nodes_list(authorized_and_available?, nodes) do last_updated_nodes = fn new_node = %Node{ first_public_key: public_key, @@ -100,7 +99,11 @@ defmodule Archethic.P2P do %NodeList{nodes: nodes} end - case quorum_read(authorized_and_available_nodes(), %ListNodes{}, conflict_resolver) do + case quorum_read( + nodes, + %ListNodes{authorized_and_available?: authorized_and_available?}, + conflict_resolver + ) do {:ok, %NodeList{nodes: nodes}} -> {:ok, nodes} @@ -570,7 +573,7 @@ defmodule Archethic.P2P do previous_public_key |> TransactionChain.get_first_public_key() |> get_node_info!() - |> do_connect_node() + |> connect_node() end def load_transaction(tx), do: MemTableLoader.load_transaction(tx) diff --git a/lib/archethic/p2p/mem_table_loader.ex b/lib/archethic/p2p/mem_table_loader.ex index 825b12c5f..bef804bd1 100644 --- a/lib/archethic/p2p/mem_table_loader.ex +++ b/lib/archethic/p2p/mem_table_loader.ex @@ -85,14 +85,7 @@ defmodule Archethic.P2P.MemTableLoader do P2P.set_node_average_availability(node_key, 1.0) _ -> - # We want to reload the previous beacon chain summary information - # if the node haven't been disconnected for a significant time (one self-repair cycle) - # if the node was disconnected for long time, then we don't load the previous view, as it's obsolete - missed_sync? = SelfRepair.missed_sync?(last_sync_date) - - Enum.each(p2p_summaries, fn summary -> - load_p2p_summary(summary, missed_sync?) - end) + Enum.each(p2p_summaries, &load_p2p_summary/1) end end @@ -197,15 +190,11 @@ defmodule Archethic.P2P.MemTableLoader do defp first_node_change?(_, _), do: false defp load_p2p_summary( - {node_public_key, available?, avg_availability, availability_update, network_patch}, - missed_sync? + {node_public_key, available?, avg_availability, availability_update, network_patch} ) do if available? do P2P.set_node_globally_synced(node_public_key) - - unless missed_sync? do - P2P.set_node_globally_available(node_public_key, availability_update) - end + P2P.set_node_globally_available(node_public_key, availability_update) end MemTable.update_node_average_availability(node_public_key, avg_availability) diff --git a/lib/archethic/p2p/message/bootstrapping_nodes.ex b/lib/archethic/p2p/message/bootstrapping_nodes.ex index 1ec9b76a9..6a40760bf 100644 --- a/lib/archethic/p2p/message/bootstrapping_nodes.ex +++ b/lib/archethic/p2p/message/bootstrapping_nodes.ex @@ -4,18 +4,23 @@ defmodule Archethic.P2P.Message.BootstrappingNodes do This message is used during the node bootstrapping """ - defstruct new_seeds: [], closest_nodes: [] + defstruct [:first_enrolled_node, new_seeds: [], closest_nodes: []] alias Archethic.P2P.Node alias Archethic.Utils.VarInt @type t() :: %__MODULE__{ new_seeds: list(Node.t()), - closest_nodes: list(Node.t()) + closest_nodes: list(Node.t()), + first_enrolled_node: Node.t() } @spec serialize(t()) :: bitstring() - def serialize(%__MODULE__{new_seeds: new_seeds, closest_nodes: closest_nodes}) do + def serialize(%__MODULE__{ + new_seeds: new_seeds, + closest_nodes: closest_nodes, + first_enrolled_node: first_enrolled_node + }) do new_seeds_bin = new_seeds |> Enum.map(&Node.serialize/1) @@ -30,8 +35,11 @@ defmodule Archethic.P2P.Message.BootstrappingNodes do encoded_closest_nodes_length = length(closest_nodes) |> VarInt.from_value() + first_enrolled_node_bin = Node.serialize(first_enrolled_node) + <> + encoded_closest_nodes_length::binary, closest_nodes_bin::bitstring, + first_enrolled_node_bin::bitstring>> end @spec deserialize(bitstring()) :: {t(), bitstring} @@ -42,9 +50,12 @@ defmodule Archethic.P2P.Message.BootstrappingNodes do {nb_closest_nodes, rest} = rest |> VarInt.get_value() {closest_nodes, rest} = deserialize_node_list(rest, nb_closest_nodes, []) + {first_enrolled_node, rest} = Node.deserialize(rest) + {%__MODULE__{ new_seeds: new_seeds, - closest_nodes: closest_nodes + closest_nodes: closest_nodes, + first_enrolled_node: first_enrolled_node }, rest} end diff --git a/lib/archethic/p2p/message/get_bootstraping_nodes.ex b/lib/archethic/p2p/message/get_bootstraping_nodes.ex index 8957f674e..444922667 100644 --- a/lib/archethic/p2p/message/get_bootstraping_nodes.ex +++ b/lib/archethic/p2p/message/get_bootstraping_nodes.ex @@ -28,7 +28,8 @@ defmodule Archethic.P2P.Message.GetBootstrappingNodes do %BootstrappingNodes{ new_seeds: Enum.take_random(top_nodes, 5), - closest_nodes: closest_nodes + closest_nodes: closest_nodes, + first_enrolled_node: P2P.get_first_enrolled_node() } end diff --git a/lib/archethic/p2p/message/list_nodes.ex b/lib/archethic/p2p/message/list_nodes.ex index 0e80c2c96..90ae83278 100644 --- a/lib/archethic/p2p/message/list_nodes.ex +++ b/lib/archethic/p2p/message/list_nodes.ex @@ -6,22 +6,27 @@ defmodule Archethic.P2P.Message.ListNodes do alias Archethic.P2P alias Archethic.P2P.Message.NodeList - defstruct [] + defstruct [:authorized_and_available?] - @type t :: %__MODULE__{} + @type t :: %__MODULE__{ + authorized_and_available?: boolean() + } @spec process(__MODULE__.t(), Crypto.key()) :: NodeList.t() - def process(%__MODULE__{}, _) do - %NodeList{nodes: P2P.list_nodes()} - end + def process(%__MODULE__{authorized_and_available?: false}, _), + do: %NodeList{nodes: P2P.list_nodes()} + + def process(%__MODULE__{authorized_and_available?: true}, _), + do: %NodeList{nodes: P2P.authorized_and_available_nodes()} @spec serialize(t()) :: bitstring() - def serialize(%__MODULE__{}), do: <<>> + def serialize(%__MODULE__{authorized_and_available?: false}), do: <<0::8>> + def serialize(%__MODULE__{authorized_and_available?: true}), do: <<1::8>> @spec deserialize(bitstring()) :: {t(), bitstring} - def deserialize(<>), - do: { - %__MODULE__{}, - rest - } + def deserialize(<<0::8, rest::bitstring>>), + do: {%__MODULE__{authorized_and_available?: false}, rest} + + def deserialize(<<1::8, rest::bitstring>>), + do: {%__MODULE__{authorized_and_available?: true}, rest} end diff --git a/lib/archethic/self_repair.ex b/lib/archethic/self_repair.ex index 2907c4d3a..e8056bd9b 100755 --- a/lib/archethic/self_repair.ex +++ b/lib/archethic/self_repair.ex @@ -45,9 +45,10 @@ defmodule Archethic.SelfRepair do @doc """ Start the bootstrap's synchronization process using the last synchronization date """ - @spec bootstrap_sync(last_sync_date :: DateTime.t()) :: :ok | :error - def bootstrap_sync(last_sync_date = %DateTime{}) do - case sync_with_retry(last_sync_date) do + @spec bootstrap_sync(last_sync_date :: DateTime.t(), download_nodes :: list(Node.t())) :: + :ok | :error + def bootstrap_sync(last_sync_date, download_nodes) do + case sync_with_retry(last_sync_date, download_nodes) do :ok -> # At the end of self repair, if a new beacon summary as been created # we run bootstrap_sync again until the last beacon summary is loaded @@ -57,7 +58,7 @@ defmodule Archethic.SelfRepair do |> BeaconChain.previous_summary_time() |> DateTime.compare(last_sync_date) do :gt -> - bootstrap_sync(last_sync_date) + bootstrap_sync(last_sync_date, download_nodes) _ -> :ok @@ -70,11 +71,11 @@ defmodule Archethic.SelfRepair do end end - defp sync_with_retry(last_sync_date) do + defp sync_with_retry(last_sync_date, download_nodes) do 0..@max_retry_count |> Enum.reduce_while(:error, fn _, _ -> try do - :ok = Sync.load_missed_transactions(last_sync_date) + :ok = Sync.load_missed_transactions(last_sync_date, download_nodes) {:halt, :ok} catch error, message -> diff --git a/lib/archethic/self_repair/network_chain.ex b/lib/archethic/self_repair/network_chain.ex index 1813bfb7d..239362d12 100644 --- a/lib/archethic/self_repair/network_chain.ex +++ b/lib/archethic/self_repair/network_chain.ex @@ -50,7 +50,7 @@ defmodule Archethic.SelfRepair.NetworkChain do def synchronous_resync(:node) do :telemetry.execute([:archethic, :self_repair, :resync], %{count: 1}, %{network_chain: :node}) - case P2P.fetch_nodes_list() do + case P2P.fetch_nodes_list(false, P2P.authorized_and_available_nodes()) do {:ok, nodes} -> nodes_to_resync = Enum.filter(nodes, &node_require_resync?/1) diff --git a/lib/archethic/self_repair/scheduler.ex b/lib/archethic/self_repair/scheduler.ex index cebf0162a..86987d546 100644 --- a/lib/archethic/self_repair/scheduler.ex +++ b/lib/archethic/self_repair/scheduler.ex @@ -5,8 +5,18 @@ defmodule Archethic.SelfRepair.Scheduler do """ use GenServer @vsn Mix.Project.config()[:version] - alias Archethic - alias Archethic.{P2P, SelfRepair.Sync, TaskSupervisor, Utils, PubSub} + + alias Archethic.BeaconChain + + alias Archethic.P2P + + alias Archethic.PubSub + + alias Archethic.SelfRepair.Sync + + alias Archethic.TaskSupervisor + + alias Archethic.Utils alias Archethic.Bootstrap.Sync, as: BootstrapSync alias Crontab.CronExpression.Parser, as: CronParser @@ -87,7 +97,12 @@ defmodule Archethic.SelfRepair.Scheduler do # Loading transactions can take a lot of time to be achieve and can overpass an epoch. # So to avoid missing a beacon summary epoch, we save the starting date and update the last sync date with it # at the end of loading (in case there is a crash during self repair) - Sync.load_missed_transactions(last_sync_date) + download_nodes = + DateTime.utc_now() + |> BeaconChain.previous_summary_time() + |> P2P.authorized_and_available_nodes(true) + + Sync.load_missed_transactions(last_sync_date, download_nodes) end) {:noreply, state, :hibernate} diff --git a/lib/archethic/self_repair/sync.ex b/lib/archethic/self_repair/sync.ex index 99ed8e6df..4340ee90b 100644 --- a/lib/archethic/self_repair/sync.ex +++ b/lib/archethic/self_repair/sync.ex @@ -102,9 +102,9 @@ defmodule Archethic.SelfRepair.Sync do Once retrieved, the transactions are downloaded and stored if not exists locally """ - @spec load_missed_transactions(last_sync_date :: DateTime.t()) :: + @spec load_missed_transactions(last_sync_date :: DateTime.t(), download_nodes :: list(Node.t())) :: :ok | {:error, :unreachable_nodes} - def load_missed_transactions(last_sync_date = %DateTime{}) do + def load_missed_transactions(last_sync_date, download_nodes) do last_summary_time = BeaconChain.previous_summary_time(DateTime.utc_now()) if DateTime.compare(last_summary_time, last_sync_date) == :gt do @@ -112,7 +112,7 @@ defmodule Archethic.SelfRepair.Sync do "Fetch missed transactions from last sync date: #{DateTime.to_string(last_sync_date)}" ) - do_load_missed_transactions(last_sync_date, last_summary_time) + do_load_missed_transactions(last_sync_date, last_summary_time, download_nodes) else Logger.info("Already synchronized for #{DateTime.to_string(last_sync_date)}") @@ -121,11 +121,9 @@ defmodule Archethic.SelfRepair.Sync do end end - defp do_load_missed_transactions(last_sync_date, last_summary_time) do + defp do_load_missed_transactions(last_sync_date, last_summary_time, download_nodes) do start = System.monotonic_time() - download_nodes = P2P.authorized_and_available_nodes(last_summary_time, true) - # Process first the old aggregates fetch_summaries_aggregates(last_sync_date, last_summary_time, download_nodes) |> Enum.each(&process_summary_aggregate(&1, download_nodes)) diff --git a/test/archethic/bootstrap/sync_test.exs b/test/archethic/bootstrap/sync_test.exs index 1b0b51e4b..4a3a3da48 100644 --- a/test/archethic/bootstrap/sync_test.exs +++ b/test/archethic/bootstrap/sync_test.exs @@ -10,6 +10,7 @@ defmodule Archethic.Bootstrap.SyncTest do alias Archethic.Crypto alias Archethic.P2P + alias Archethic.P2P.Client alias Archethic.P2P.Message.GetTransactionChainLength alias Archethic.P2P.Message.TransactionChainLength alias Archethic.P2P.Message.EncryptedStorageNonce @@ -46,6 +47,7 @@ defmodule Archethic.Bootstrap.SyncTest do @moduletag :capture_log import Mox + import Mock setup do MockClient @@ -332,7 +334,13 @@ defmodule Archethic.Bootstrap.SyncTest do end end - test "load_node_list/0 should request node list from the closest nodes" do + test_with_mock "connect_current_node/1 should request node list from the closest nodes and connect to them", + Client, + [:passthrough], + new_connection: fn _, _, _, public_key -> + P2P.MemTable.increase_node_availability(public_key) + {:ok, make_ref()} + end do node = %Node{ ip: {80, 10, 101, 202}, port: 4390, @@ -347,7 +355,7 @@ defmodule Archethic.Bootstrap.SyncTest do network_patch: "AAA" } - :ok = P2P.add_and_connect_node(node) + :ok = P2P.connect_node(node) first_public_key = Crypto.first_node_public_key() @@ -358,43 +366,33 @@ defmodule Archethic.Bootstrap.SyncTest do first_public_key: first_public_key, last_public_key: Crypto.last_node_public_key(), enrollment_date: DateTime.utc_now(), - authorized?: false, + authorized?: true, + available?: true, + availability_history: <<1::1>>, network_patch: "AAA" } - :ok = P2P.add_and_connect_node(node2) + node3 = %Node{ + ip: {127, 0, 0, 1}, + port: 3000, + http_port: 4000, + first_public_key: "key2", + last_public_key: "key2", + authorized?: true, + availability_history: <<1::1>>, + available?: true + } MockClient |> stub(:send_message, fn - _, %ListNodes{}, _ -> - {:ok, - %NodeList{ - nodes: [ - %Node{ - ip: {127, 0, 0, 1}, - port: 3000, - http_port: 4000, - first_public_key: "key2", - last_public_key: "key2" - } - ] - }} + _, %ListNodes{authorized_and_available?: true}, _ -> + {:ok, %NodeList{nodes: [node, node2, node3]}} end) - assert :ok = Sync.load_node_list() - - assert [ - %Node{first_public_key: ^first_public_key}, - %Node{first_public_key: "key1"}, - %Node{ - ip: {127, 0, 0, 1}, - port: 3000, - http_port: 4000, - first_public_key: "key2", - last_public_key: "key2", - availability_history: <<2::2>> - } - ] = P2P.list_nodes() + assert {:ok, [^node, ^node2, ^node3]} = Sync.connect_current_node([node]) + + assert_called_exactly(Client.new_connection(:_, :_, :_, "key1"), 2) + assert_called(Client.new_connection(:_, :_, :_, "key2")) end test "load_storage_nonce/1 should fetch the storage nonce, decrypt it with the node key" do diff --git a/test/archethic/bootstrap/transaction_handler_test.exs b/test/archethic/bootstrap/transaction_handler_test.exs index dfa9e5eba..f5529efe3 100644 --- a/test/archethic/bootstrap/transaction_handler_test.exs +++ b/test/archethic/bootstrap/transaction_handler_test.exs @@ -46,7 +46,8 @@ defmodule Archethic.Bootstrap.TransactionHandlerTest do available?: true, authorized?: true, authorization_date: DateTime.utc_now() |> DateTime.add(-10), - enrollment_date: DateTime.utc_now() + enrollment_date: DateTime.utc_now(), + availability_history: <<1::1>> } :ok = P2P.add_and_connect_node(node) diff --git a/test/archethic/bootstrap_test.exs b/test/archethic/bootstrap_test.exs index 4f3be19d1..56b03abdd 100644 --- a/test/archethic/bootstrap_test.exs +++ b/test/archethic/bootstrap_test.exs @@ -16,7 +16,6 @@ defmodule Archethic.BootstrapTest do alias Archethic.P2P.Message.{ BootstrappingNodes, EncryptedStorageNonce, - GenesisAddress, GetBootstrappingNodes, GetGenesisAddress, GetLastTransactionAddress, @@ -42,7 +41,6 @@ defmodule Archethic.BootstrapTest do alias Archethic.Reward.MemTablesLoader, as: RewardTableLoader alias Archethic.SelfRepair.Scheduler, as: SelfRepairScheduler - alias Archethic.SelfRepair.NetworkChain alias Archethic.SharedSecrets @@ -50,7 +48,6 @@ defmodule Archethic.BootstrapTest do alias Archethic.TransactionChain.Transaction alias Archethic.TransactionChain.Transaction.ValidationStamp alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations - alias Archethic.TransactionFactory import Mox import Mock @@ -274,7 +271,8 @@ defmodule Archethic.BootstrapTest do ], closest_nodes: [ Enum.at(nodes, 1) - ] + ], + first_enrolled_node: Enum.at(nodes, 0) }} _, %GetStorageNonce{}, _ -> @@ -445,201 +443,4 @@ defmodule Archethic.BootstrapTest do Process.sleep(100) end end - - describe "synchronous_resync/1 nss_chain" do - setup do - p2p_context() - - curr_time = DateTime.utc_now() - - txn0 = - TransactionFactory.create_network_tx(:node_shared_secrets, - index: 0, - timestamp: curr_time |> DateTime.add(-14_400, :second), - prev_txn: [] - ) - - txn1 = - TransactionFactory.create_network_tx(:node_shared_secrets, - index: 1, - timestamp: curr_time |> DateTime.add(-14_400, :second), - prev_txn: [txn0] - ) - - txn2 = - TransactionFactory.create_network_tx(:node_shared_secrets, - index: 2, - timestamp: curr_time |> DateTime.add(-7_200, :second), - prev_txn: [txn1] - ) - - txn3 = - TransactionFactory.create_network_tx(:node_shared_secrets, - index: 3, - timestamp: curr_time |> DateTime.add(-3_600, :second), - prev_txn: [txn2] - ) - - txn4 = - TransactionFactory.create_network_tx(:node_shared_secrets, - index: 4, - timestamp: curr_time, - prev_txn: [txn3] - ) - - :persistent_term.put(:node_shared_secrets_gen_addr, txn0.address) - %{txn0: txn0, txn1: txn1, txn2: txn2, txn3: txn3, txn4: txn4} - end - - test "Should return :ok when Genesis Address are not loaded", _nss_chain do - # first time boot no txns exits yet - :persistent_term.put(:node_shared_secrets_gen_addr, nil) - - assert :ok = NetworkChain.synchronous_resync(:node_shared_secrets) - end - - test "Should return :ok when last address match (locally and remotely)", nss_chain do - # node restart but within renewal interval - me = self() - addr0 = nss_chain.txn0.address - - MockDB - |> stub(:get_last_chain_address, fn ^addr0 -> - send(me, :local_last_addr_request) - {nss_chain.txn4.address, DateTime.utc_now()} - end) - - MockClient - |> stub(:send_message, fn - _, %GetLastTransactionAddress{address: ^addr0}, _ -> - send(me, :remote_last_addr_request) - {:ok, %LastTransactionAddress{address: nss_chain.txn4.address}} - - _, %GetTransaction{}, _ -> - send(me, :fetch_last_txn) - end) - - assert :ok = NetworkChain.synchronous_resync(:node_shared_secrets) - - assert_receive(:local_last_addr_request) - assert_receive(:remote_last_addr_request) - refute_receive(:fetch_last_txn) - end - - test "should Retrieve and Store Network tx's, when last tx's not available", nss_chain do - # scenario nss chain - # addr0 -> addr1 -> addr2 -> addr3 -> addr4 - # node1 => addr0 -> addr1 -> addr2 - # node2 => addr0 -> addr1 -> addr2 -> addr3 -> addr4 - addr0 = nss_chain.txn0.address - addr1 = nss_chain.txn1.address - addr2 = nss_chain.txn2.address - addr3 = nss_chain.txn3.address - addr4 = nss_chain.txn4.address - - me = self() - - now = DateTime.utc_now() - - MockDB - |> stub(:list_chain_addresses, fn - ^addr0 -> [{addr1, now}, {addr2, now}, {addr3, now}, {addr4, now}] - end) - |> stub(:transaction_exists?, fn - ^addr4, _ -> false - ^addr3, _ -> false - ^addr2, _ -> true - ^addr1, _ -> true - end) - |> stub(:write_transaction, fn tx, _ -> - # to know this fx executed or not we use send - send(me, {:write_transaction, tx.address}) - :ok - end) - - MockClient - |> stub(:send_message, fn - _, %GetLastTransactionAddress{address: ^addr0}, _ -> - {:ok, %LastTransactionAddress{address: addr4}} - - _, %GetTransaction{address: ^addr4}, _ -> - {:ok, nss_chain.txn4} - - _, %GetTransaction{address: ^addr3}, _ -> - {:ok, nss_chain.txn3} - - _, %GetTransactionInputs{address: ^addr3}, _ -> - {:ok, %TransactionInputList{inputs: []}} - - _, %GetGenesisAddress{address: ^addr3}, _ -> - {:ok, %GenesisAddress{address: addr0, timestamp: DateTime.utc_now()}} - - _, %GetTransactionChain{address: ^addr3, paging_state: ^addr2}, _ -> - {:ok, - %TransactionList{ - transactions: [nss_chain.txn3, nss_chain.txn4], - more?: false, - paging_state: nil - }} - end) - - assert :ok = NetworkChain.synchronous_resync(:node_shared_secrets) - - # flow - # get_gen_addr(:pers_term) -> resolve_last_address -> get_last_address - # | - # validate_and_store_transaction_chain <- fetch_transaction_remotely - # | - # transaction_exists? -> fetch_context(tx) -> get_last_txn (db then -> remote check) - # | - # transaction_exists?(prev_txn\tx3) <- stream_previous_chain <- fetch_inputs_remotely - # | - # stream_transaction_chain(addr3/prev-tx) -> fetch_genesis_address_remotely -> - # | - # &TransactionChain.write/1 <- stream_remotely(addr3,addr2) <- get_last_address(locally) - # | - # write_transaction(tx4) -> ingest txn4 - assert_receive({:write_transaction, ^addr3}) - assert_receive({:write_transaction, ^addr4}) - end - - defp p2p_context() do - pb_key3 = Crypto.derive_keypair("key33", 0) |> elem(0) - - SharedSecrets.add_origin_public_key(:software, Crypto.first_node_public_key()) - - coordinator_node = %Node{ - first_public_key: Crypto.first_node_public_key(), - last_public_key: Crypto.last_node_public_key(), - authorized?: true, - available?: true, - authorization_date: DateTime.add(DateTime.utc_now(), -86_400, :second), - geo_patch: "AAA", - network_patch: "AAA", - enrollment_date: DateTime.add(DateTime.utc_now(), -86_400, :second), - reward_address: Crypto.derive_address(Crypto.last_node_public_key()) - } - - storage_nodes = [ - %Node{ - ip: {127, 0, 0, 1}, - port: 3000, - first_public_key: pb_key3, - last_public_key: pb_key3, - available?: true, - authorized?: true, - geo_patch: "BBB", - network_patch: "BBB", - authorization_date: DateTime.add(DateTime.utc_now(), -86_400, :second), - reward_address: Crypto.derive_address(pb_key3), - enrollment_date: DateTime.add(DateTime.utc_now(), -86_400, :second) - } - ] - - Enum.each(storage_nodes, &P2P.add_and_connect_node(&1)) - - # P2P.add_and_connect_node(welcome_node) - P2P.add_and_connect_node(coordinator_node) - end - end end diff --git a/test/archethic/p2p/messages_test.exs b/test/archethic/p2p/messages_test.exs index 4a37d5ddc..47020cfe2 100644 --- a/test/archethic/p2p/messages_test.exs +++ b/test/archethic/p2p/messages_test.exs @@ -85,8 +85,14 @@ defmodule Archethic.P2P.MessageTest do end test "ListNodes message" do - assert %ListNodes{} = - %ListNodes{} + assert %ListNodes{authorized_and_available?: true} = + %ListNodes{authorized_and_available?: true} + |> Message.encode() + |> Message.decode() + |> elem(0) + + assert %ListNodes{authorized_and_available?: false} = + %ListNodes{authorized_and_available?: false} |> Message.encode() |> Message.decode() |> elem(0) @@ -716,7 +722,34 @@ defmodule Archethic.P2P.MessageTest do <<0, 0, 76, 168, 99, 61, 84, 206, 158, 226, 212, 161, 60, 62, 55, 101, 249, 142, 174, 178, 157, 241, 148, 35, 19, 177, 109, 40, 224, 179, 31, 66, 129, 4>> } - ] + ], + first_enrolled_node: %Node{ + ip: {127, 0, 0, 1}, + port: 3000, + http_port: 4000, + first_public_key: + <<0, 0, 182, 67, 168, 252, 227, 203, 142, 164, 142, 248, 159, 209, 249, 247, 86, 64, + 92, 224, 91, 182, 122, 49, 209, 169, 96, 111, 219, 204, 57, 250, 59, 226>>, + last_public_key: + <<0, 0, 182, 67, 168, 252, 227, 203, 142, 164, 142, 248, 159, 209, 249, 247, 86, 64, + 92, 224, 91, 182, 122, 49, 209, 169, 96, 111, 219, 204, 57, 250, 59, 226>>, + geo_patch: "FA9", + network_patch: "AVC", + available?: true, + average_availability: 0.8, + enrollment_date: ~U[2020-06-26 08:36:11Z], + authorization_date: ~U[2020-06-26 08:36:11Z], + authorized?: true, + reward_address: + <<0, 0, 163, 237, 233, 93, 14, 241, 241, 8, 144, 218, 105, 16, 138, 243, 223, 17, 182, + 87, 9, 7, 53, 146, 174, 125, 5, 244, 42, 35, 209, 142, 24, 164>>, + last_address: + <<0, 0, 165, 32, 187, 102, 112, 133, 38, 17, 232, 54, 228, 173, 254, 94, 179, 32, 173, + 88, 122, 234, 88, 139, 82, 26, 113, 42, 8, 183, 190, 163, 221, 112>>, + origin_public_key: + <<0, 0, 76, 168, 99, 61, 84, 206, 158, 226, 212, 161, 60, 62, 55, 101, 249, 142, 174, + 178, 157, 241, 148, 35, 19, 177, 109, 40, 224, 179, 31, 66, 129, 4>> + } } assert msg == diff --git a/test/archethic/self_repair/sync_test.exs b/test/archethic/self_repair/sync_test.exs index b98b7f033..59d74a0b7 100644 --- a/test/archethic/self_repair/sync_test.exs +++ b/test/archethic/self_repair/sync_test.exs @@ -236,7 +236,11 @@ defmodule Archethic.SelfRepair.SyncTest do MockDB |> stub(:register_stats, fn _, _, _, _ -> :ok end) - assert :ok = Sync.load_missed_transactions(DateTime.utc_now() |> DateTime.add(-86_400)) + assert :ok = + Sync.load_missed_transactions( + DateTime.utc_now() |> DateTime.add(-86_400), + P2P.authorized_and_available_nodes() + ) assert_received :storage end @@ -403,7 +407,10 @@ defmodule Archethic.SelfRepair.SyncTest do {:ok, %NotFound{}} end) - Sync.load_missed_transactions(DateTime.utc_now() |> DateTime.add(-1, :hour)) + Sync.load_missed_transactions( + DateTime.utc_now() |> DateTime.add(-1, :hour), + P2P.authorized_and_available_nodes() + ) assert_receive {:new_replication_attestation, ^attestation2} assert_receive :should_request