Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bootstrap node view #940

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions lib/archethic/bootstrap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ defmodule Archethic.Bootstrap do
defp post_bootstrap(opts) do
if Keyword.get(opts, :sync?, true) do
Logger.info("Synchronization started")
# Always load the current node list to have the current view for downloading transaction
:ok = Sync.load_node_list()
:ok = SelfRepair.bootstrap_sync(SelfRepair.last_sync_date())
Logger.info("Synchronization finished")
end
Expand Down Expand Up @@ -261,10 +263,12 @@ defmodule Archethic.Bootstrap do

reward_address =
if length > 0 do
{:ok, last_address} =
Crypto.derive_address(Crypto.first_node_public_key())
|> TransactionChain.fetch_last_address_remotely(closest_nodes)

{:ok, %Transaction{data: %TransactionData{content: content}}} =
TransactionChain.get_last_transaction(
Crypto.derive_address(Crypto.first_node_public_key())
)
TransactionChain.fetch_transaction_remotely(last_address, closest_nodes)

{:ok, _ip, _p2p_port, _http_port, _transport, last_reward_address, _origin_public_key,
_key_certificate} = Node.decode_transaction_content(content)
Expand All @@ -280,7 +284,6 @@ defmodule Archethic.Bootstrap do
:ok = TransactionHandler.send_transaction(tx, closest_nodes)

:ok = Sync.load_storage_nonce(closest_nodes)
:ok = Sync.load_node_list(closest_nodes)
end

defp update_node(ip, port, http_port, transport, patch, bootstrapping_seeds, reward_address) do
Expand Down
45 changes: 38 additions & 7 deletions lib/archethic/bootstrap/sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ defmodule Archethic.Bootstrap.Sync do
alias Archethic.P2P.Message.NotifyEndOfNodeSync
alias Archethic.P2P.Node

alias Archethic.SelfRepair

alias Archethic.SharedSecrets

alias Archethic.TransactionChain
Expand Down Expand Up @@ -133,20 +135,49 @@ defmodule Archethic.Bootstrap.Sync do
@doc """
Fetch and load the nodes list
"""
@spec load_node_list(list(Node.t())) :: :ok | {:error, :network_issue}
def load_node_list([node | rest]) do
case P2P.send_message(node, %ListNodes{}) do
@spec load_node_list() :: :ok | {:error, :network_issue}
def load_node_list() do
Neylix marked this conversation as resolved.
Show resolved Hide resolved
current_nodes = P2P.authorized_and_available_nodes()

last_updated_nodes =
fn new_node = %Node{
first_public_key: public_key,
last_update_date: update_date
},
acc ->
previous_node =
%Node{last_update_date: previous_update_date} = Map.get(acc, public_key, new_node)

node =
if DateTime.compare(update_date, previous_update_date) == :gt,
do: new_node,
else: previous_node

Map.put(acc, public_key, node)
end

conflict_resolver = fn results ->
nodes =
Enum.flat_map(results, fn %NodeList{nodes: nodes} -> nodes end)
|> Enum.reduce(%{}, fn node, acc -> last_updated_nodes.(node, acc) end)
|> Map.values()

%NodeList{nodes: nodes}
end

case P2P.quorum_read(current_nodes, %ListNodes{}, conflict_resolver) do
{:ok, %NodeList{nodes: nodes}} ->
Enum.each(nodes, &P2P.add_and_connect_node/1)
# After loading all current nodes, we update the p2p view with the last one stored in DB
# to have a proper view for the next beacon summary to self repair
SelfRepair.last_sync_date() |> P2P.reload_last_view()
Logger.info("Node list refreshed")

{:error, _} ->
load_node_list(rest)
error ->
error
end
end

def load_node_list([]), do: {:error, :network_issue}

@doc """
Fetch and load the storage nonce
"""
Expand Down
6 changes: 6 additions & 0 deletions lib/archethic/p2p.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ defmodule Archethic.P2P do
end
end

@doc """
Reload last P2P view from DB
"""
@spec reload_last_view(last_sync_date :: DateTime.t() | nil) :: :ok
defdelegate reload_last_view(last_sync_date), to: MemTableLoader, as: :load_p2p_view

@doc """
List the nodes registered.
"""
Expand Down
15 changes: 13 additions & 2 deletions lib/archethic/p2p/mem_table.ex
Original file line number Diff line number Diff line change
Expand Up @@ -714,11 +714,22 @@ defmodule Archethic.P2P.MemTable do
when is_binary(first_public_key) do
Logger.info("Node globally available", node: Base.encode16(first_public_key))

# When a node bootstrap, before starting its self repair, it load the current p2p view of the network.
# But then when the self repair occurs, the node needs to know the p2p view at the summary time so we need to update
# the node availability update to match the one it was at the time of the current repairing summary.
# So if a node wants to update a node already available but with a lower availability_update, it means that
# the node is performing a self repair from the bootstrap so we update the date

availability_pos = Keyword.fetch!(@discovery_index_position, :available?)
availability_update_pos = Keyword.fetch!(@discovery_index_position, :availability_update)

unless :ets.lookup_element(@discovery_table, first_public_key, availability_pos) do
availability_update_pos = Keyword.fetch!(@discovery_index_position, :availability_update)
already_available? = :ets.lookup_element(@discovery_table, first_public_key, availability_pos)

availability_update_from_bootstrap? =
:ets.lookup_element(@discovery_table, first_public_key, availability_update_pos)
|> DateTime.compare(availability_update) == :gt

if not already_available? or availability_update_from_bootstrap? do
:ets.update_element(@discovery_table, first_public_key, [
{availability_pos, true},
{availability_update_pos, availability_update}
Expand Down
5 changes: 3 additions & 2 deletions lib/archethic/p2p/mem_table_loader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@ defmodule Archethic.P2P.MemTableLoader do
{:ok, %{}}
end

defp load_p2p_view(nil), do: :ok
@spec load_p2p_view(DateTime.t() | nil) :: :ok
def load_p2p_view(nil), do: :ok

defp load_p2p_view(last_repair_time) do
def load_p2p_view(last_repair_time) do
next_repair_time =
:archethic
|> Application.get_env(SelfRepairScheduler, [])
Expand Down
16 changes: 7 additions & 9 deletions lib/archethic/self_repair/sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -127,19 +127,17 @@ defmodule Archethic.SelfRepair.Sync do

download_nodes = P2P.authorized_and_available_nodes(last_summary_time, true)

summaries_aggregates =
fetch_summaries_aggregates(last_sync_date, last_summary_time, download_nodes)
# Process first the old aggregates
fetch_summaries_aggregates(last_sync_date, last_summary_time, download_nodes)
|> Enum.each(&process_summary_aggregate(&1, download_nodes))

# Then process the last one to have the last P2P view
last_aggregate = BeaconChain.fetch_and_aggregate_summaries(last_summary_time, download_nodes)
ensure_download_last_aggregate(last_aggregate, download_nodes)

last_aggregate =
aggregate_with_local_summaries(last_aggregate, last_summary_time)
|> verify_attestations_threshold()

summaries_aggregates
|> Stream.concat([last_aggregate])
|> Enum.each(&process_summary_aggregate(&1, download_nodes))
aggregate_with_local_summaries(last_aggregate, last_summary_time)
|> verify_attestations_threshold()
|> process_summary_aggregate(download_nodes)

:telemetry.execute([:archethic, :self_repair], %{duration: System.monotonic_time() - start})
Archethic.Bootstrap.NetworkConstraints.persist_genesis_address()
Expand Down
31 changes: 26 additions & 5 deletions test/archethic/bootstrap/sync_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -332,18 +332,38 @@ defmodule Archethic.Bootstrap.SyncTest do
end
end

test "load_node_list/1 should request node list from the closest nodes" do
test "load_node_list/0 should request node list from the closest nodes" do
node = %Node{
ip: {80, 10, 101, 202},
port: 4390,
http_port: 4000,
first_public_key: "key1",
last_public_key: "key1",
availability_history: <<1::1>>
availability_history: <<1::1>>,
authorized?: true,
available?: true,
authorization_date: DateTime.utc_now(),
enrollment_date: DateTime.utc_now(),
network_patch: "AAA"
}

:ok = P2P.add_and_connect_node(node)

first_public_key = Crypto.first_node_public_key()

node2 = %Node{
ip: {127, 0, 0, 1},
port: 3000,
http_port: 4000,
first_public_key: first_public_key,
last_public_key: Crypto.last_node_public_key(),
enrollment_date: DateTime.utc_now(),
authorized?: false,
network_patch: "AAA"
}

:ok = P2P.add_and_connect_node(node2)

MockClient
|> stub(:send_message, fn
_, %ListNodes{}, _ ->
Expand All @@ -361,10 +381,11 @@ defmodule Archethic.Bootstrap.SyncTest do
}}
end)

assert :ok = Sync.load_node_list([node])
assert :ok = Sync.load_node_list()

assert [
node,
%Node{first_public_key: ^first_public_key},
%Node{first_public_key: "key1"},
%Node{
ip: {127, 0, 0, 1},
port: 3000,
Expand All @@ -373,7 +394,7 @@ defmodule Archethic.Bootstrap.SyncTest do
last_public_key: "key2",
availability_history: <<2::2>>
}
] == P2P.list_nodes()
] = P2P.list_nodes()
end

test "load_storage_nonce/1 should fetch the storage nonce, decrypt it with the node key" do
Expand Down