Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bootsrap p2p view #1093

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 41 additions & 61 deletions lib/archethic/bootstrap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,32 @@ defmodule Archethic.Bootstrap do
P2P.get_geo_patch(ip)
end

node_first_public_key = Crypto.first_node_public_key()

closest_bootstrapping_nodes =
case bootstrapping_seeds do
[%Node{first_public_key: ^node_first_public_key}] ->
bootstrapping_seeds

nodes ->
Enum.each(nodes, &P2P.connect_node/1)
{:ok, closest_nodes} = Sync.get_closest_nodes_and_renew_seeds(nodes, network_patch)
closest_nodes
end

if should_bootstrap?(ip, port, http_port, transport, last_sync_date) do
start_bootstrap(
ip,
port,
http_port,
transport,
bootstrapping_seeds,
network_patch,
closest_bootstrapping_nodes,
reward_address
)
else
post_bootstrap()
end

post_bootstrap(closest_bootstrapping_nodes)

Logger.info("Bootstrapping finished!")
end

Expand Down Expand Up @@ -148,14 +160,13 @@ defmodule Archethic.Bootstrap do
port,
http_port,
transport,
bootstrapping_seeds,
network_patch,
closest_bootstrapping_nodes,
configured_reward_address
) do
Logger.info("Bootstrapping starting")

cond do
Sync.should_initialize_network?(bootstrapping_seeds) ->
Sync.should_initialize_network?(closest_bootstrapping_nodes) ->
Logger.info("This node should initialize the network!!")
Logger.debug("Create first node transaction")

Expand All @@ -170,7 +181,6 @@ defmodule Archethic.Bootstrap do

Sync.initialize_network(tx)

post_bootstrap()
SelfRepair.put_last_sync_date(DateTime.utc_now())

Crypto.first_node_public_key() == Crypto.previous_node_public_key() ->
Expand All @@ -181,13 +191,10 @@ defmodule Archethic.Bootstrap do
port,
http_port,
transport,
network_patch,
bootstrapping_seeds,
closest_bootstrapping_nodes,
configured_reward_address
)

post_bootstrap()

true ->
Logger.info("Update node chain...")

Expand All @@ -204,23 +211,20 @@ defmodule Archethic.Bootstrap do
port,
http_port,
transport,
network_patch,
bootstrapping_seeds,
closest_bootstrapping_nodes,
last_reward_address
)

post_bootstrap()
end
end

defp post_bootstrap() do
defp post_bootstrap(closest_bootstrapping_nodes) do
last_sync_date = SelfRepair.last_sync_date()

if SelfRepair.missed_sync?(last_sync_date) do
Logger.info("Synchronization started")
# Always load the current node list to have the current view for downloading transaction
:ok = Sync.load_node_list()
:ok = SelfRepair.bootstrap_sync(last_sync_date)
{:ok, current_nodes} = Sync.connect_current_node(closest_bootstrapping_nodes)
:ok = SelfRepair.bootstrap_sync(last_sync_date, current_nodes)
Logger.info("Synchronization finished")
end

Expand All @@ -241,34 +245,25 @@ defmodule Archethic.Bootstrap do
port,
http_port,
transport,
patch,
bootstrapping_seeds,
closest_bootstrapping_nodes,
configured_reward_address
) do
Enum.each(bootstrapping_seeds, &P2P.add_and_connect_node/1)

{:ok, closest_nodes} = Sync.get_closest_nodes_and_renew_seeds(bootstrapping_seeds, patch)

closest_nodes =
closest_nodes
|> Enum.reject(&(&1.first_public_key == Crypto.first_node_public_key()))

# In case node had lose it's DB, we ask the network if the node chain already exists
{:ok, length} =
Crypto.first_node_public_key()
|> Crypto.derive_address()
|> TransactionChain.fetch_size_remotely(closest_nodes)
|> TransactionChain.fetch_size_remotely(closest_bootstrapping_nodes)

Crypto.set_node_key_index(length)

reward_address =
if length > 0 do
{:ok, last_address} =
Crypto.derive_address(Crypto.first_node_public_key())
|> TransactionChain.fetch_last_address_remotely(closest_nodes)
|> TransactionChain.fetch_last_address_remotely(closest_bootstrapping_nodes)

{:ok, %Transaction{data: %TransactionData{content: content}}} =
TransactionChain.fetch_transaction_remotely(last_address, closest_nodes)
TransactionChain.fetch_transaction_remotely(last_address, closest_bootstrapping_nodes)

{:ok, _ip, _p2p_port, _http_port, _transport, last_reward_address, _origin_public_key,
_key_certificate} = Node.decode_transaction_content(content)
Expand All @@ -281,41 +276,26 @@ defmodule Archethic.Bootstrap do
tx =
TransactionHandler.create_node_transaction(ip, port, http_port, transport, reward_address)

{:ok, validated_tx} = TransactionHandler.send_transaction(tx, closest_nodes)
{:ok, validated_tx} = TransactionHandler.send_transaction(tx, closest_bootstrapping_nodes)

:ok = Sync.load_storage_nonce(closest_nodes)
:ok = Sync.load_storage_nonce(closest_bootstrapping_nodes)

Replication.sync_transaction_chain(validated_tx, closest_nodes)
Replication.sync_transaction_chain(validated_tx, closest_bootstrapping_nodes)
end

defp update_node(ip, port, http_port, transport, patch, bootstrapping_seeds, reward_address) do
case Enum.reject(
bootstrapping_seeds,
&(&1.first_public_key == Crypto.first_node_public_key())
) do
[] ->
Logger.warning("Not enough nodes in the network. No node update")

_ ->
{:ok, closest_nodes} = Sync.get_closest_nodes_and_renew_seeds(bootstrapping_seeds, patch)

closest_nodes =
closest_nodes
|> Enum.reject(&(&1.first_public_key == Crypto.first_node_public_key()))

tx =
TransactionHandler.create_node_transaction(
ip,
port,
http_port,
transport,
reward_address
)
defp update_node(ip, port, http_port, transport, closest_bootstrapping_nodes, reward_address) do
tx =
TransactionHandler.create_node_transaction(
ip,
port,
http_port,
transport,
reward_address
)

{:ok, validated_tx} = TransactionHandler.send_transaction(tx, closest_nodes)
{:ok, validated_tx} = TransactionHandler.send_transaction(tx, closest_bootstrapping_nodes)

Replication.sync_transaction_chain(validated_tx, closest_nodes)
end
Replication.sync_transaction_chain(validated_tx, closest_bootstrapping_nodes)
end

@doc """
Expand Down
46 changes: 34 additions & 12 deletions lib/archethic/bootstrap/sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ defmodule Archethic.Bootstrap.Sync do
alias Archethic.P2P.Message.NotifyEndOfNodeSync
alias Archethic.P2P.Node

alias Archethic.SelfRepair

alias Archethic.SharedSecrets

alias Archethic.TaskSupervisor

alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction

Expand Down Expand Up @@ -133,15 +133,27 @@ defmodule Archethic.Bootstrap.Sync do
@doc """
Fetch and load the nodes list
"""
@spec load_node_list() :: :ok | {:error, :network_issue}
def load_node_list() do
case P2P.fetch_nodes_list() do
@spec connect_current_node(closest_nodes :: list(Node.t())) ::
{:ok, list(Node.t())} | {:error, :network_issue}
def connect_current_node(closest_nodes) do
case P2P.fetch_nodes_list(true, closest_nodes) do
{:ok, nodes} ->
Enum.each(nodes, &P2P.add_and_connect_node/1)
# After loading all current nodes, we update the p2p view with the last one stored in DB
# to have a proper view for the next beacon summary to self repair
SelfRepair.last_sync_date() |> P2P.reload_last_view()
nodes =
Task.Supervisor.async_stream(
TaskSupervisor,
nodes,
fn node ->
P2P.connect_node(node)
# Wait connection time
Process.sleep(500)
%Node{node | availability_history: <<1::1>>}
end
)
|> Stream.filter(&match?({:ok, _}, &1))
|> Enum.map(fn {:ok, node} -> node end)

Logger.info("Node list refreshed")
{:ok, nodes}

{:error, :network_issue} ->
{:error, :network_issue}
Expand Down Expand Up @@ -180,17 +192,27 @@ defmodule Archethic.Bootstrap.Sync do
{:ok, list(Node.t())} | {:error, :network_issue}
def get_closest_nodes_and_renew_seeds([node | rest], patch) do
case P2P.send_message(node, %GetBootstrappingNodes{patch: patch}) do
{:ok, %BootstrappingNodes{closest_nodes: closest_nodes, new_seeds: new_seeds}} ->
{:ok,
%BootstrappingNodes{
closest_nodes: closest_nodes,
new_seeds: new_seeds,
first_enrolled_node: first_enrolled_node
}} ->
:ok = P2P.new_bootstrapping_seeds(new_seeds)

P2P.add_and_connect_node(first_enrolled_node)
Logger.info("First enrolled node added into P2P MemTable")

closest_nodes =
(new_seeds ++ closest_nodes)
|> P2P.distinct_nodes()
|> Enum.reject(&(&1.first_public_key == Crypto.first_node_public_key()))
|> Task.async_stream(fn node ->
P2P.add_and_connect_node(node)
P2P.connect_node(node)
# Wait for connection time
Process.sleep(500)
P2P.get_node_info!(node.first_public_key)
# Set node locally available after connection
%Node{node | availability_history: <<1::1>>}
end)
|> Enum.filter(&match?({:ok, _}, &1))
|> Enum.map(fn {:ok, node} -> node end)
Expand Down
14 changes: 2 additions & 12 deletions lib/archethic/bootstrap/transaction_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ defmodule Archethic.Bootstrap.TransactionHandler do

alias Archethic.Crypto

alias Archethic.Election

alias Archethic.P2P
alias Archethic.P2P.Message.Ok
alias Archethic.P2P.Message.NewTransaction
Expand Down Expand Up @@ -32,7 +30,7 @@ defmodule Archethic.Bootstrap.TransactionHandler do
end

defp do_send_transaction(
[node | rest],
nodes = [node | rest],
tx = %Transaction{address: address, type: type, data: transaction_data}
) do
case P2P.send_message(node, %NewTransaction{
Expand All @@ -45,15 +43,7 @@ defmodule Archethic.Bootstrap.TransactionHandler do
transaction_type: type
)

storage_nodes =
Election.chain_storage_nodes_with_type(
address,
type,
P2P.authorized_and_available_nodes()
)
|> Enum.reject(&(&1.first_public_key == Crypto.first_node_public_key()))

case Utils.await_confirmation(address, storage_nodes) do
case Utils.await_confirmation(address, nodes) do
{:ok, validated_transaction = %Transaction{address: ^address, data: ^transaction_data}} ->
{:ok, validated_transaction}

Expand Down
37 changes: 20 additions & 17 deletions lib/archethic/p2p.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,19 @@ defmodule Archethic.P2P do
def add_and_connect_node(node = %Node{first_public_key: first_public_key}) do
:ok = MemTable.add_node(node)
node = get_node_info!(first_public_key)
do_connect_node(node)
connect_node(node)
end

defp do_connect_node(%Node{
ip: ip,
port: port,
transport: transport,
first_public_key: first_public_key
}) do
@doc """
Establish a connection with a node
"""
@spec connect_node(Node.t()) :: :ok
def connect_node(%Node{
ip: ip,
port: port,
transport: transport,
first_public_key: first_public_key
}) do
if first_public_key == Crypto.first_node_public_key() do
:ok
else
Expand All @@ -57,12 +61,6 @@ defmodule Archethic.P2P do
end
end

@doc """
Reload last P2P view from DB
"""
@spec reload_last_view(last_sync_date :: DateTime.t() | nil) :: :ok
defdelegate reload_last_view(last_sync_date), to: MemTableLoader, as: :load_p2p_view

@doc """
List the nodes registered.
"""
Expand All @@ -72,8 +70,9 @@ defmodule Archethic.P2P do
@doc """
Fetch the list of nodes from close nodes
"""
@spec fetch_nodes_list() :: {:ok, list(Node.t())} | {:error, :network_issue}
def fetch_nodes_list() do
@spec fetch_nodes_list(authorized_and_available? :: boolean(), node_list :: list(Node.t())) ::
{:ok, list(Node.t())} | {:error, :network_issue}
def fetch_nodes_list(authorized_and_available?, nodes) do
last_updated_nodes =
fn new_node = %Node{
first_public_key: public_key,
Expand All @@ -100,7 +99,11 @@ defmodule Archethic.P2P do
%NodeList{nodes: nodes}
end

case quorum_read(authorized_and_available_nodes(), %ListNodes{}, conflict_resolver) do
case quorum_read(
nodes,
%ListNodes{authorized_and_available?: authorized_and_available?},
conflict_resolver
) do
{:ok, %NodeList{nodes: nodes}} ->
{:ok, nodes}

Expand Down Expand Up @@ -570,7 +573,7 @@ defmodule Archethic.P2P do
previous_public_key
|> TransactionChain.get_first_public_key()
|> get_node_info!()
|> do_connect_node()
|> connect_node()
end

def load_transaction(tx), do: MemTableLoader.load_transaction(tx)
Expand Down
Loading