From 87e1114a20cc280100ac82b9d5d7ec448c9ebc98 Mon Sep 17 00:00:00 2001 From: Neylix Date: Wed, 8 Feb 2023 15:59:54 +0100 Subject: [PATCH 1/6] Always load actual node view to ensure download nodes are correct --- lib/archethic/bootstrap.ex | 3 ++- lib/archethic/bootstrap/sync.ex | 18 +++++++++++------- test/archethic/bootstrap/sync_test.exs | 9 ++++++--- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/lib/archethic/bootstrap.ex b/lib/archethic/bootstrap.ex index adf82dff3..938735b01 100644 --- a/lib/archethic/bootstrap.ex +++ b/lib/archethic/bootstrap.ex @@ -203,6 +203,8 @@ defmodule Archethic.Bootstrap do defp post_bootstrap(opts) do if Keyword.get(opts, :sync?, true) do Logger.info("Synchronization started") + # Always load the current node list to have the current view for downloading transaction + :ok = Sync.load_node_list() :ok = SelfRepair.bootstrap_sync(SelfRepair.last_sync_date()) Logger.info("Synchronization finished") end @@ -280,7 +282,6 @@ defmodule Archethic.Bootstrap do :ok = TransactionHandler.send_transaction(tx, closest_nodes) :ok = Sync.load_storage_nonce(closest_nodes) - :ok = Sync.load_node_list(closest_nodes) end defp update_node(ip, port, http_port, transport, patch, bootstrapping_seeds, reward_address) do diff --git a/lib/archethic/bootstrap/sync.ex b/lib/archethic/bootstrap/sync.ex index f81b11635..202376586 100644 --- a/lib/archethic/bootstrap/sync.ex +++ b/lib/archethic/bootstrap/sync.ex @@ -133,20 +133,24 @@ defmodule Archethic.Bootstrap.Sync do @doc """ Fetch and load the nodes list """ - @spec load_node_list(list(Node.t())) :: :ok | {:error, :network_issue} - def load_node_list([node | rest]) do - case P2P.send_message(node, %ListNodes{}) do + @spec load_node_list() :: :ok | {:error, :network_issue} + def load_node_list() do + current_nodes = P2P.authorized_and_available_nodes() + + conflict_resolver = fn results -> + Enum.max_by(results, fn %NodeList{nodes: nodes} -> length(nodes) end) + end + + case P2P.quorum_read(current_nodes, %ListNodes{}, conflict_resolver) do {:ok, %NodeList{nodes: nodes}} -> Enum.each(nodes, &P2P.add_and_connect_node/1) Logger.info("Node list refreshed") - {:error, _} -> - load_node_list(rest) + error -> + error end end - def load_node_list([]), do: {:error, :network_issue} - @doc """ Fetch and load the storage nonce """ diff --git a/test/archethic/bootstrap/sync_test.exs b/test/archethic/bootstrap/sync_test.exs index 0bd4c34b5..f19f6414b 100644 --- a/test/archethic/bootstrap/sync_test.exs +++ b/test/archethic/bootstrap/sync_test.exs @@ -332,14 +332,17 @@ defmodule Archethic.Bootstrap.SyncTest do end end - test "load_node_list/1 should request node list from the closest nodes" do + test "load_node_list/0 should request node list from the closest nodes" do node = %Node{ ip: {80, 10, 101, 202}, port: 4390, http_port: 4000, first_public_key: "key1", last_public_key: "key1", - availability_history: <<1::1>> + availability_history: <<1::1>>, + authorized?: true, + available?: true, + authorization_date: DateTime.utc_now() } :ok = P2P.add_and_connect_node(node) @@ -361,7 +364,7 @@ defmodule Archethic.Bootstrap.SyncTest do }} end) - assert :ok = Sync.load_node_list([node]) + assert :ok = Sync.load_node_list() assert [ node, From 1f220150c4cea58ab9c964ae1bba71bf660d5431 Mon Sep 17 00:00:00 2001 From: Neylix Date: Mon, 6 Mar 2023 11:00:00 +0100 Subject: [PATCH 2/6] Reload last stored P2P after loading current nodes --- lib/archethic/bootstrap/sync.ex | 5 +++++ lib/archethic/p2p.ex | 6 ++++++ lib/archethic/p2p/mem_table_loader.ex | 5 +++-- test/archethic/bootstrap/sync_test.exs | 24 +++++++++++++++++++++--- 4 files changed, 35 insertions(+), 5 deletions(-) diff --git a/lib/archethic/bootstrap/sync.ex b/lib/archethic/bootstrap/sync.ex index 202376586..e0884ba97 100644 --- a/lib/archethic/bootstrap/sync.ex +++ b/lib/archethic/bootstrap/sync.ex @@ -18,6 +18,8 @@ defmodule Archethic.Bootstrap.Sync do alias Archethic.P2P.Message.NotifyEndOfNodeSync alias Archethic.P2P.Node + alias Archethic.SelfRepair + alias Archethic.SharedSecrets alias Archethic.TransactionChain @@ -144,6 +146,9 @@ defmodule Archethic.Bootstrap.Sync do case P2P.quorum_read(current_nodes, %ListNodes{}, conflict_resolver) do {:ok, %NodeList{nodes: nodes}} -> Enum.each(nodes, &P2P.add_and_connect_node/1) + # After loading all current nodes, we update the p2p view with the last one stored in DB + # to have a proper view for the next beacon summary to self repair + SelfRepair.last_sync_date() |> P2P.reload_last_view() Logger.info("Node list refreshed") error -> diff --git a/lib/archethic/p2p.ex b/lib/archethic/p2p.ex index f3d7bf879..92490507b 100644 --- a/lib/archethic/p2p.ex +++ b/lib/archethic/p2p.ex @@ -54,6 +54,12 @@ defmodule Archethic.P2P do end end + @doc """ + Reload last P2P view from DB + """ + @spec reload_last_view(last_sync_date :: DateTime.t() | nil) :: :ok + defdelegate reload_last_view(last_sync_date), to: MemTableLoader, as: :load_p2p_view + @doc """ List the nodes registered. """ diff --git a/lib/archethic/p2p/mem_table_loader.ex b/lib/archethic/p2p/mem_table_loader.ex index 6c3f2da79..65200c5ca 100644 --- a/lib/archethic/p2p/mem_table_loader.ex +++ b/lib/archethic/p2p/mem_table_loader.ex @@ -64,9 +64,10 @@ defmodule Archethic.P2P.MemTableLoader do {:ok, %{}} end - defp load_p2p_view(nil), do: :ok + @spec load_p2p_view(DateTime.t() | nil) :: :ok + def load_p2p_view(nil), do: :ok - defp load_p2p_view(last_repair_time) do + def load_p2p_view(last_repair_time) do next_repair_time = :archethic |> Application.get_env(SelfRepairScheduler, []) diff --git a/test/archethic/bootstrap/sync_test.exs b/test/archethic/bootstrap/sync_test.exs index f19f6414b..1b0b51e4b 100644 --- a/test/archethic/bootstrap/sync_test.exs +++ b/test/archethic/bootstrap/sync_test.exs @@ -342,11 +342,28 @@ defmodule Archethic.Bootstrap.SyncTest do availability_history: <<1::1>>, authorized?: true, available?: true, - authorization_date: DateTime.utc_now() + authorization_date: DateTime.utc_now(), + enrollment_date: DateTime.utc_now(), + network_patch: "AAA" } :ok = P2P.add_and_connect_node(node) + first_public_key = Crypto.first_node_public_key() + + node2 = %Node{ + ip: {127, 0, 0, 1}, + port: 3000, + http_port: 4000, + first_public_key: first_public_key, + last_public_key: Crypto.last_node_public_key(), + enrollment_date: DateTime.utc_now(), + authorized?: false, + network_patch: "AAA" + } + + :ok = P2P.add_and_connect_node(node2) + MockClient |> stub(:send_message, fn _, %ListNodes{}, _ -> @@ -367,7 +384,8 @@ defmodule Archethic.Bootstrap.SyncTest do assert :ok = Sync.load_node_list() assert [ - node, + %Node{first_public_key: ^first_public_key}, + %Node{first_public_key: "key1"}, %Node{ ip: {127, 0, 0, 1}, port: 3000, @@ -376,7 +394,7 @@ defmodule Archethic.Bootstrap.SyncTest do last_public_key: "key2", availability_history: <<2::2>> } - ] == P2P.list_nodes() + ] = P2P.list_nodes() end test "load_storage_nonce/1 should fetch the storage nonce, decrypt it with the node key" do From 87aaa6e3961ad8ce699892f69cdd68fc2c8bf040 Mon Sep 17 00:00:00 2001 From: Neylix Date: Mon, 6 Mar 2023 11:15:31 +0100 Subject: [PATCH 3/6] Set new availability update during self repair --- lib/archethic/p2p/mem_table.ex | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/lib/archethic/p2p/mem_table.ex b/lib/archethic/p2p/mem_table.ex index 939fbce09..89a21ca6d 100644 --- a/lib/archethic/p2p/mem_table.ex +++ b/lib/archethic/p2p/mem_table.ex @@ -714,11 +714,22 @@ defmodule Archethic.P2P.MemTable do when is_binary(first_public_key) do Logger.info("Node globally available", node: Base.encode16(first_public_key)) + # When a node bootstrap, before starting its self repair, it load the current p2p view of the network. + # But then when the self repair occurs, the node needs to know the p2p view at the summary time so we need to update + # the node availability update to match the one it was at the time of the current repairing summary. + # So if a node wants to update a node already available but with a lower availability_update, it means that + # the node is performing a self repair from the bootstrap so we update the date + availability_pos = Keyword.fetch!(@discovery_index_position, :available?) + availability_update_pos = Keyword.fetch!(@discovery_index_position, :availability_update) - unless :ets.lookup_element(@discovery_table, first_public_key, availability_pos) do - availability_update_pos = Keyword.fetch!(@discovery_index_position, :availability_update) + already_available? = :ets.lookup_element(@discovery_table, first_public_key, availability_pos) + + availability_update_from_bootstrap? = + :ets.lookup_element(@discovery_table, first_public_key, availability_update_pos) + |> DateTime.compare(availability_update) == :gt + if not already_available? or availability_update_from_bootstrap? do :ets.update_element(@discovery_table, first_public_key, [ {availability_pos, true}, {availability_update_pos, availability_update} From 6afe88b98539000f5ac7be1c8e076cd76c6a1832 Mon Sep 17 00:00:00 2001 From: Neylix Date: Wed, 8 Mar 2023 15:23:40 +0100 Subject: [PATCH 4/6] Load old summary aggregate before last summary --- lib/archethic/self_repair/sync.ex | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/lib/archethic/self_repair/sync.ex b/lib/archethic/self_repair/sync.ex index e67e6907c..4a3361573 100644 --- a/lib/archethic/self_repair/sync.ex +++ b/lib/archethic/self_repair/sync.ex @@ -127,19 +127,17 @@ defmodule Archethic.SelfRepair.Sync do download_nodes = P2P.authorized_and_available_nodes(last_summary_time, true) - summaries_aggregates = - fetch_summaries_aggregates(last_sync_date, last_summary_time, download_nodes) + # Process first the old aggregates + fetch_summaries_aggregates(last_sync_date, last_summary_time, download_nodes) + |> Enum.each(&process_summary_aggregate(&1, download_nodes)) + # Then process the last one to have the last P2P view last_aggregate = BeaconChain.fetch_and_aggregate_summaries(last_summary_time, download_nodes) ensure_download_last_aggregate(last_aggregate, download_nodes) - last_aggregate = - aggregate_with_local_summaries(last_aggregate, last_summary_time) - |> verify_attestations_threshold() - - summaries_aggregates - |> Stream.concat([last_aggregate]) - |> Enum.each(&process_summary_aggregate(&1, download_nodes)) + aggregate_with_local_summaries(last_aggregate, last_summary_time) + |> verify_attestations_threshold() + |> process_summary_aggregate(download_nodes) :telemetry.execute([:archethic, :self_repair], %{duration: System.monotonic_time() - start}) Archethic.Bootstrap.NetworkConstraints.persist_genesis_address() From fb1c700455a440dfcf59f52dbbf9c6733a6dbfe2 Mon Sep 17 00:00:00 2001 From: Neylix Date: Wed, 8 Mar 2023 14:01:09 +0100 Subject: [PATCH 5/6] Fix bootstrap reward address retrieval --- lib/archethic/bootstrap.ex | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/archethic/bootstrap.ex b/lib/archethic/bootstrap.ex index 938735b01..201a8e0c5 100644 --- a/lib/archethic/bootstrap.ex +++ b/lib/archethic/bootstrap.ex @@ -263,10 +263,12 @@ defmodule Archethic.Bootstrap do reward_address = if length > 0 do + {:ok, last_address} = + Crypto.derive_address(Crypto.first_node_public_key()) + |> TransactionChain.fetch_last_address_remotely(closest_nodes) + {:ok, %Transaction{data: %TransactionData{content: content}}} = - TransactionChain.get_last_transaction( - Crypto.derive_address(Crypto.first_node_public_key()) - ) + TransactionChain.fetch_transaction_remotely(last_address, closest_nodes) {:ok, _ip, _p2p_port, _http_port, _transport, last_reward_address, _origin_public_key, _key_certificate} = Node.decode_transaction_content(content) From 515aac49949ba9606dedfd715461e7971319350a Mon Sep 17 00:00:00 2001 From: Neylix Date: Mon, 20 Mar 2023 15:02:57 +0100 Subject: [PATCH 6/6] Update conflict resolver when loading node list --- lib/archethic/bootstrap/sync.ex | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/lib/archethic/bootstrap/sync.ex b/lib/archethic/bootstrap/sync.ex index e0884ba97..0aa36d642 100644 --- a/lib/archethic/bootstrap/sync.ex +++ b/lib/archethic/bootstrap/sync.ex @@ -139,8 +139,30 @@ defmodule Archethic.Bootstrap.Sync do def load_node_list() do current_nodes = P2P.authorized_and_available_nodes() + last_updated_nodes = + fn new_node = %Node{ + first_public_key: public_key, + last_update_date: update_date + }, + acc -> + previous_node = + %Node{last_update_date: previous_update_date} = Map.get(acc, public_key, new_node) + + node = + if DateTime.compare(update_date, previous_update_date) == :gt, + do: new_node, + else: previous_node + + Map.put(acc, public_key, node) + end + conflict_resolver = fn results -> - Enum.max_by(results, fn %NodeList{nodes: nodes} -> length(nodes) end) + nodes = + Enum.flat_map(results, fn %NodeList{nodes: nodes} -> nodes end) + |> Enum.reduce(%{}, fn node, acc -> last_updated_nodes.(node, acc) end) + |> Map.values() + + %NodeList{nodes: nodes} end case P2P.quorum_read(current_nodes, %ListNodes{}, conflict_resolver) do