Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve local connection #1098

Merged
merged 8 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/archethic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ defmodule Archethic do
|> Enum.reject(&(&1.first_public_key == welcome_node_key))
|> Enum.sort_by(& &1.first_public_key)
|> P2P.nearest_nodes(welcome_node_patch)
|> Enum.filter(&Node.locally_available?/1)
|> Enum.filter(&P2P.node_connected?/1)

this_node = Crypto.first_node_public_key()

Expand Down
2 changes: 1 addition & 1 deletion lib/archethic/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ defmodule Archethic.BeaconChain do
|> Enum.map(fn subset ->
subset
|> Election.beacon_storage_nodes(next_summary_date, authorized_nodes)
|> Enum.filter(&Node.locally_available?/1)
|> Enum.filter(&P2P.node_connected?/1)
|> P2P.nearest_nodes()
|> Enum.take(3)
|> Enum.map(&{&1, subset})
Expand Down
2 changes: 1 addition & 1 deletion lib/archethic/bootstrap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ defmodule Archethic.Bootstrap do
bootstrapping_seeds

nodes ->
Enum.each(nodes, &P2P.connect_node/1)
P2P.connect_nodes(nodes)
{:ok, closest_nodes} = Sync.get_closest_nodes_and_renew_seeds(nodes, network_patch)
closest_nodes
end
Expand Down
28 changes: 3 additions & 25 deletions lib/archethic/bootstrap/sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ defmodule Archethic.Bootstrap.Sync do

alias Archethic.SharedSecrets

alias Archethic.TaskSupervisor

alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction

Expand Down Expand Up @@ -138,20 +136,7 @@ defmodule Archethic.Bootstrap.Sync do
def connect_current_node(closest_nodes) do
case P2P.fetch_nodes_list(true, closest_nodes) do
{:ok, nodes} ->
nodes =
Task.Supervisor.async_stream(
TaskSupervisor,
nodes,
fn node ->
P2P.connect_node(node)
# Wait connection time
Process.sleep(500)
%Node{node | availability_history: <<1::1>>}
end
)
|> Stream.filter(&match?({:ok, _}, &1))
|> Enum.map(fn {:ok, node} -> node end)

P2P.connect_nodes(nodes)
Logger.info("Node list refreshed")
{:ok, nodes}

Expand Down Expand Up @@ -207,15 +192,8 @@ defmodule Archethic.Bootstrap.Sync do
(new_seeds ++ closest_nodes)
|> P2P.distinct_nodes()
|> Enum.reject(&(&1.first_public_key == Crypto.first_node_public_key()))
|> Task.async_stream(fn node ->
P2P.connect_node(node)
# Wait for connection time
Process.sleep(500)
# Set node locally available after connection
%Node{node | availability_history: <<1::1>>}
end)
|> Enum.filter(&match?({:ok, _}, &1))
|> Enum.map(fn {:ok, node} -> node end)

P2P.connect_nodes(closest_nodes)

Logger.info("Closest nodes and seeds loaded in the P2P view")

Expand Down
2 changes: 1 addition & 1 deletion lib/archethic/mining/pending_transaction_validation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ defmodule Archethic.Mining.PendingTransactionValidation do
previous_address
|> Election.chain_storage_nodes(P2P.authorized_and_available_nodes())
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)
|> Enum.filter(&P2P.node_connected?/1)
|> get_first_public_key(previous_address)
end

Expand Down
20 changes: 10 additions & 10 deletions lib/archethic/mining/validation_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,20 @@ defmodule Archethic.Mining.ValidationContext do
iex> ValidationContext.new(
...> transaction: %Transaction{},
...> welcome_node: %Node{last_public_key: "key1", availability_history: <<1::1>>},
...> coordinator_node: %Node{last_public_key: "key2", availability_history: <<1::1>>},
...> cross_validation_nodes: [%Node{last_public_key: "key3", availability_history: <<1::1>>}],
...> chain_storage_nodes: [%Node{last_public_key: "key4", availability_history: <<1::1>>}, %Node{last_public_key: "key5", availability_history: <<1::1>>}],
...> beacon_storage_nodes: [%Node{last_public_key: "key6", availability_history: <<1::1>>}, %Node{last_public_key: "key7", availability_history: <<1::1>>}]
...> welcome_node: %Node{last_public_key: "key1"},
...> coordinator_node: %Node{last_public_key: "key2"},
...> cross_validation_nodes: [%Node{last_public_key: "key3"}],
...> chain_storage_nodes: [%Node{last_public_key: "key4"}, %Node{last_public_key: "key5"}],
...> beacon_storage_nodes: [%Node{last_public_key: "key6"}, %Node{last_public_key: "key7"}]
...> )
%ValidationContext{
transaction: %Transaction{},
welcome_node: %Node{last_public_key: "key1", availability_history: <<1::1>>},
coordinator_node: %Node{last_public_key: "key2", availability_history: <<1::1>>},
cross_validation_nodes: [%Node{last_public_key: "key3", availability_history: <<1::1>>}],
welcome_node: %Node{last_public_key: "key1"},
coordinator_node: %Node{last_public_key: "key2"},
cross_validation_nodes: [%Node{last_public_key: "key3"}],
cross_validation_nodes_confirmation: <<0::1>>,
chain_storage_nodes: [%Node{last_public_key: "key4", availability_history: <<1::1>>}, %Node{last_public_key: "key5", availability_history: <<1::1>>}],
beacon_storage_nodes: [%Node{last_public_key: "key6", availability_history: <<1::1>>}, %Node{last_public_key: "key7", availability_history: <<1::1>>}]
chain_storage_nodes: [%Node{last_public_key: "key4"}, %Node{last_public_key: "key5"}],
beacon_storage_nodes: [%Node{last_public_key: "key6"}, %Node{last_public_key: "key7"}]
}
"""
@spec new(opts :: Keyword.t()) :: t()
Expand Down
2 changes: 1 addition & 1 deletion lib/archethic/networking/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ defmodule Archethic.Networking.Scheduler do
defp handle_new_ip(%Transaction{address: tx_address, data: transaction_data}) do
nodes =
P2P.authorized_and_available_nodes()
|> Enum.filter(&Node.locally_available?/1)
|> Enum.filter(&P2P.node_connected?/1)
|> P2P.nearest_nodes()

case Utils.await_confirmation(tx_address, nodes) do
Expand Down
81 changes: 47 additions & 34 deletions lib/archethic/p2p.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,43 @@ defmodule Archethic.P2P do
def add_and_connect_node(node = %Node{first_public_key: first_public_key}) do
:ok = MemTable.add_node(node)
node = get_node_info!(first_public_key)
connect_node(node)
do_connect_node(node)
end

@doc """
Establish a connection with a node
Establish a connection on a list of node in parallel
"""
@spec connect_node(Node.t()) :: :ok
def connect_node(%Node{
ip: ip,
port: port,
transport: transport,
first_public_key: first_public_key
}) do
@spec connect_nodes(list(Node.t())) :: :ok
def connect_nodes(nodes) do
Task.Supervisor.async_stream(
TaskSupervisor,
nodes,
fn node = %Node{first_public_key: first_public_key} ->
unless node_connected?(node) do
do_connect_node(node)

receive do
:connected ->
:ok

{:error, reason} ->
Logger.warning("First connection attempt failed for #{inspect(reason)}",
node: Base.encode16(first_public_key)
)
end
end
end,
on_timeout: :kill_task
)
|> Stream.run()
end

defp do_connect_node(%Node{
ip: ip,
port: port,
transport: transport,
first_public_key: first_public_key
}) do
if first_public_key == Crypto.first_node_public_key() do
:ok
else
Expand All @@ -61,6 +85,18 @@ defmodule Archethic.P2P do
end
end

@doc """
Return true if the node connection is established
"""
@spec node_connected?(node :: Node.t()) :: boolean()
def node_connected?(%Node{first_public_key: first_public_key}) do
if first_public_key == Crypto.first_node_public_key() do
true
else
Client.connected?(first_public_key)
end
end

@doc """
List the nodes registered.
"""
Expand Down Expand Up @@ -496,29 +532,6 @@ defmodule Archethic.P2P do
@spec new_bootstrapping_seeds(list(Node.t())) :: :ok
defdelegate new_bootstrapping_seeds(nodes), to: BootstrappingSeeds, as: :update

@doc """
Create a binary sequence from a list node and set bit regarding their availability

## Examples

iex> P2P.nodes_availability_as_bits([
...> %Node{availability_history: <<1::1, 0::1>>},
...> %Node{availability_history: <<0::1, 1::1>>},
...> %Node{availability_history: <<1::1, 0::1>>}
...> ])
<<1::1, 0::1, 1::1>>
"""
@spec nodes_availability_as_bits(list(Node.t())) :: bitstring()
def nodes_availability_as_bits(node_list) when is_list(node_list) do
Enum.reduce(node_list, <<>>, fn node, acc ->
if Node.locally_available?(node) do
<<acc::bitstring, 1::1>>
else
<<acc::bitstring, 0::1>>
end
end)
end

@doc """
Create a sequence of bits from a list of node and a subset
by setting the bit where the subset node if found
Expand Down Expand Up @@ -573,7 +586,7 @@ defmodule Archethic.P2P do
previous_public_key
|> TransactionChain.get_first_public_key()
|> get_node_info!()
|> connect_node()
|> do_connect_node()
end

def load_transaction(tx), do: MemTableLoader.load_transaction(tx)
Expand Down Expand Up @@ -695,7 +708,7 @@ defmodule Archethic.P2P do
consistency_level
) do
nodes
|> Enum.filter(&Node.locally_available?/1)
|> Enum.filter(&node_connected?/1)
|> nearest_nodes()
|> unprioritize_node(Crypto.first_node_public_key())
|> do_quorum_read(
Expand Down
2 changes: 2 additions & 0 deletions lib/archethic/p2p/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ defmodule Archethic.P2P.Client do
| {:error, :closed}

@callback get_availability_timer(Crypto.key(), boolean()) :: non_neg_integer()

@callback connected?(Crypto.key()) :: boolean()
end
Loading