Skip to content

Commit

Permalink
Do not load node in memtable during bootstrap
Browse files Browse the repository at this point in the history
  • Loading branch information
Neylix committed Jun 13, 2023
1 parent 2246517 commit 8df39b4
Show file tree
Hide file tree
Showing 17 changed files with 241 additions and 381 deletions.
95 changes: 40 additions & 55 deletions lib/archethic/bootstrap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,31 @@ defmodule Archethic.Bootstrap do
P2P.get_geo_patch(ip)
end

node_first_public_key = Crypto.first_node_public_key()

closest_bootstrapping_nodes =
case bootstrapping_seeds do
[%Node{first_public_key: ^node_first_public_key}] ->
bootstrapping_seeds

nodes ->
Enum.each(nodes, &P2P.connect_node/1)
{:ok, closest_nodes} = Sync.get_closest_nodes_and_renew_seeds(nodes, network_patch)
closest_nodes
end

if should_bootstrap?(ip, port, http_port, transport, last_sync_date) do
start_bootstrap(
ip,
port,
http_port,
transport,
bootstrapping_seeds,
network_patch,
closest_bootstrapping_nodes,
reward_address
)
end

post_bootstrap()
post_bootstrap(closest_bootstrapping_nodes)

Logger.info("Bootstrapping finished!")
end
Expand Down Expand Up @@ -148,14 +160,13 @@ defmodule Archethic.Bootstrap do
port,
http_port,
transport,
bootstrapping_seeds,
network_patch,
closest_bootstrapping_nodes,
configured_reward_address
) do
Logger.info("Bootstrapping starting")

cond do
Sync.should_initialize_network?(bootstrapping_seeds) ->
Sync.should_initialize_network?(closest_bootstrapping_nodes) ->
Logger.info("This node should initialize the network!!")
Logger.debug("Create first node transaction")

Expand All @@ -180,8 +191,7 @@ defmodule Archethic.Bootstrap do
port,
http_port,
transport,
network_patch,
bootstrapping_seeds,
closest_bootstrapping_nodes,
configured_reward_address
)

Expand All @@ -201,21 +211,20 @@ defmodule Archethic.Bootstrap do
port,
http_port,
transport,
network_patch,
bootstrapping_seeds,
closest_bootstrapping_nodes,
last_reward_address
)
end
end

defp post_bootstrap() do
defp post_bootstrap(closest_bootstrapping_nodes) do
last_sync_date = SelfRepair.last_sync_date()

if SelfRepair.missed_sync?(last_sync_date) do
Logger.info("Synchronization started")
# Always load the current node list to have the current view for downloading transaction
:ok = Sync.load_node_list()
:ok = SelfRepair.bootstrap_sync(last_sync_date)
{:ok, current_nodes} = Sync.connect_current_node(closest_bootstrapping_nodes)
:ok = SelfRepair.bootstrap_sync(last_sync_date, current_nodes)
Logger.info("Synchronization finished")
end

Expand All @@ -236,34 +245,25 @@ defmodule Archethic.Bootstrap do
port,
http_port,
transport,
patch,
bootstrapping_seeds,
closest_bootstrapping_nodes,
configured_reward_address
) do
Enum.each(bootstrapping_seeds, &P2P.add_and_connect_node/1)

{:ok, closest_nodes} = Sync.get_closest_nodes_and_renew_seeds(bootstrapping_seeds, patch)

closest_nodes =
closest_nodes
|> Enum.reject(&(&1.first_public_key == Crypto.first_node_public_key()))

# In case node had lose it's DB, we ask the network if the node chain already exists
{:ok, length} =
Crypto.first_node_public_key()
|> Crypto.derive_address()
|> TransactionChain.fetch_size_remotely(closest_nodes)
|> TransactionChain.fetch_size_remotely(closest_bootstrapping_nodes)

Crypto.set_node_key_index(length)

reward_address =
if length > 0 do
{:ok, last_address} =
Crypto.derive_address(Crypto.first_node_public_key())
|> TransactionChain.fetch_last_address_remotely(closest_nodes)
|> TransactionChain.fetch_last_address_remotely(closest_bootstrapping_nodes)

{:ok, %Transaction{data: %TransactionData{content: content}}} =
TransactionChain.fetch_transaction_remotely(last_address, closest_nodes)
TransactionChain.fetch_transaction_remotely(last_address, closest_bootstrapping_nodes)

{:ok, _ip, _p2p_port, _http_port, _transport, last_reward_address, _origin_public_key,
_key_certificate} = Node.decode_transaction_content(content)
Expand All @@ -276,41 +276,26 @@ defmodule Archethic.Bootstrap do
tx =
TransactionHandler.create_node_transaction(ip, port, http_port, transport, reward_address)

{:ok, validated_tx} = TransactionHandler.send_transaction(tx, closest_nodes)
{:ok, validated_tx} = TransactionHandler.send_transaction(tx, closest_bootstrapping_nodes)

:ok = Sync.load_storage_nonce(closest_nodes)
:ok = Sync.load_storage_nonce(closest_bootstrapping_nodes)

Replication.sync_transaction_chain(validated_tx, closest_nodes)
Replication.sync_transaction_chain(validated_tx, closest_bootstrapping_nodes)
end

defp update_node(ip, port, http_port, transport, patch, bootstrapping_seeds, reward_address) do
case Enum.reject(
bootstrapping_seeds,
&(&1.first_public_key == Crypto.first_node_public_key())
) do
[] ->
Logger.warning("Not enough nodes in the network. No node update")

_ ->
{:ok, closest_nodes} = Sync.get_closest_nodes_and_renew_seeds(bootstrapping_seeds, patch)

closest_nodes =
closest_nodes
|> Enum.reject(&(&1.first_public_key == Crypto.first_node_public_key()))

tx =
TransactionHandler.create_node_transaction(
ip,
port,
http_port,
transport,
reward_address
)
defp update_node(ip, port, http_port, transport, closest_bootstrapping_nodes, reward_address) do
tx =
TransactionHandler.create_node_transaction(
ip,
port,
http_port,
transport,
reward_address
)

{:ok, validated_tx} = TransactionHandler.send_transaction(tx, closest_nodes)
{:ok, validated_tx} = TransactionHandler.send_transaction(tx, closest_bootstrapping_nodes)

Replication.sync_transaction_chain(validated_tx, closest_nodes)
end
Replication.sync_transaction_chain(validated_tx, closest_bootstrapping_nodes)
end

@doc """
Expand Down
46 changes: 34 additions & 12 deletions lib/archethic/bootstrap/sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ defmodule Archethic.Bootstrap.Sync do
alias Archethic.P2P.Message.NotifyEndOfNodeSync
alias Archethic.P2P.Node

alias Archethic.SelfRepair

alias Archethic.SharedSecrets

alias Archethic.TaskSupervisor

alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction

Expand Down Expand Up @@ -133,15 +133,27 @@ defmodule Archethic.Bootstrap.Sync do
@doc """
Fetch and load the nodes list
"""
@spec load_node_list() :: :ok | {:error, :network_issue}
def load_node_list() do
case P2P.fetch_nodes_list() do
@spec connect_current_node(closest_nodes :: list(Node.t())) ::
{:ok, list(Node.t())} | {:error, :network_issue}
def connect_current_node(closest_nodes) do
case P2P.fetch_nodes_list(true, closest_nodes) do
{:ok, nodes} ->
Enum.each(nodes, &P2P.add_and_connect_node/1)
# After loading all current nodes, we update the p2p view with the last one stored in DB
# to have a proper view for the next beacon summary to self repair
SelfRepair.last_sync_date() |> P2P.reload_last_view()
nodes =
Task.Supervisor.async_stream(
TaskSupervisor,
nodes,
fn node ->
P2P.connect_node(node)
# Wait connection time
Process.sleep(500)
%Node{node | availability_history: <<1::1>>}
end
)
|> Stream.filter(&match?({:ok, _}, &1))
|> Enum.map(fn {:ok, node} -> node end)

Logger.info("Node list refreshed")
{:ok, nodes}

{:error, :network_issue} ->
{:error, :network_issue}
Expand Down Expand Up @@ -180,17 +192,27 @@ defmodule Archethic.Bootstrap.Sync do
{:ok, list(Node.t())} | {:error, :network_issue}
def get_closest_nodes_and_renew_seeds([node | rest], patch) do
case P2P.send_message(node, %GetBootstrappingNodes{patch: patch}) do
{:ok, %BootstrappingNodes{closest_nodes: closest_nodes, new_seeds: new_seeds}} ->
{:ok,
%BootstrappingNodes{
closest_nodes: closest_nodes,
new_seeds: new_seeds,
first_enrolled_node: first_enrolled_node
}} ->
:ok = P2P.new_bootstrapping_seeds(new_seeds)

P2P.add_and_connect_node(first_enrolled_node)
Logger.info("First enrolled node added into P2P MemTable")

closest_nodes =
(new_seeds ++ closest_nodes)
|> P2P.distinct_nodes()
|> Enum.reject(&(&1.first_public_key == Crypto.first_node_public_key()))
|> Task.async_stream(fn node ->
P2P.add_and_connect_node(node)
P2P.connect_node(node)
# Wait for connection time
Process.sleep(500)
P2P.get_node_info!(node.first_public_key)
# Set node locally available after connection
%Node{node | availability_history: <<1::1>>}
end)
|> Enum.filter(&match?({:ok, _}, &1))
|> Enum.map(fn {:ok, node} -> node end)
Expand Down
14 changes: 2 additions & 12 deletions lib/archethic/bootstrap/transaction_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ defmodule Archethic.Bootstrap.TransactionHandler do

alias Archethic.Crypto

alias Archethic.Election

alias Archethic.P2P
alias Archethic.P2P.Message.Ok
alias Archethic.P2P.Message.NewTransaction
Expand Down Expand Up @@ -32,7 +30,7 @@ defmodule Archethic.Bootstrap.TransactionHandler do
end

defp do_send_transaction(
[node | rest],
nodes = [node | rest],
tx = %Transaction{address: address, type: type, data: transaction_data}
) do
case P2P.send_message(node, %NewTransaction{
Expand All @@ -45,15 +43,7 @@ defmodule Archethic.Bootstrap.TransactionHandler do
transaction_type: type
)

storage_nodes =
Election.chain_storage_nodes_with_type(
address,
type,
P2P.authorized_and_available_nodes()
)
|> Enum.reject(&(&1.first_public_key == Crypto.first_node_public_key()))

case Utils.await_confirmation(address, storage_nodes) do
case Utils.await_confirmation(address, nodes) do
{:ok, validated_transaction = %Transaction{address: ^address, data: ^transaction_data}} ->
{:ok, validated_transaction}

Expand Down
37 changes: 20 additions & 17 deletions lib/archethic/p2p.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,19 @@ defmodule Archethic.P2P do
def add_and_connect_node(node = %Node{first_public_key: first_public_key}) do
:ok = MemTable.add_node(node)
node = get_node_info!(first_public_key)
do_connect_node(node)
connect_node(node)
end

defp do_connect_node(%Node{
ip: ip,
port: port,
transport: transport,
first_public_key: first_public_key
}) do
@doc """
Establish a connection with a node
"""
@spec connect_node(Node.t()) :: :ok
def connect_node(%Node{
ip: ip,
port: port,
transport: transport,
first_public_key: first_public_key
}) do
if first_public_key == Crypto.first_node_public_key() do
:ok
else
Expand All @@ -57,12 +61,6 @@ defmodule Archethic.P2P do
end
end

@doc """
Reload last P2P view from DB
"""
@spec reload_last_view(last_sync_date :: DateTime.t() | nil) :: :ok
defdelegate reload_last_view(last_sync_date), to: MemTableLoader, as: :load_p2p_view

@doc """
List the nodes registered.
"""
Expand All @@ -72,8 +70,9 @@ defmodule Archethic.P2P do
@doc """
Fetch the list of nodes from close nodes
"""
@spec fetch_nodes_list() :: {:ok, list(Node.t())} | {:error, :network_issue}
def fetch_nodes_list() do
@spec fetch_nodes_list(authorized_and_available? :: boolean(), node_list :: list(Node.t())) ::
{:ok, list(Node.t())} | {:error, :network_issue}
def fetch_nodes_list(authorized_and_available?, nodes) do
last_updated_nodes =
fn new_node = %Node{
first_public_key: public_key,
Expand All @@ -100,7 +99,11 @@ defmodule Archethic.P2P do
%NodeList{nodes: nodes}
end

case quorum_read(authorized_and_available_nodes(), %ListNodes{}, conflict_resolver) do
case quorum_read(
nodes,
%ListNodes{authorized_and_available?: authorized_and_available?},
conflict_resolver
) do
{:ok, %NodeList{nodes: nodes}} ->
{:ok, nodes}

Expand Down Expand Up @@ -570,7 +573,7 @@ defmodule Archethic.P2P do
previous_public_key
|> TransactionChain.get_first_public_key()
|> get_node_info!()
|> do_connect_node()
|> connect_node()
end

def load_transaction(tx), do: MemTableLoader.load_transaction(tx)
Expand Down
Loading

0 comments on commit 8df39b4

Please sign in to comment.