Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix connection infos overwrite #1099

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/archethic/bootstrap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,11 @@ defmodule Archethic.Bootstrap do
Logger.info("Synchronization finished")
end

# Connect nodes after all synchronization are finished
# so we have the latest connection infos available at this time
Logger.info("Try connection on all nodes")
P2P.list_nodes() |> P2P.connect_nodes()

Archethic.Bootstrap.NetworkConstraints.persist_genesis_address()

Logger.info("Enforced Resync: Started!")
Expand Down
41 changes: 23 additions & 18 deletions lib/archethic/p2p.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,42 +45,47 @@ defmodule Archethic.P2P do

@doc """
Establish a connection on a list of node in parallel
This function wait for the connections to be established
timeout for connection is 5 sec
"""
@spec connect_nodes(list(Node.t())) :: :ok
def connect_nodes(nodes) do
not_connected_nodes = Enum.reject(nodes, &node_connected?/1)

Task.Supervisor.async_stream(
TaskSupervisor,
nodes,
not_connected_nodes,
fn node = %Node{first_public_key: first_public_key} ->
unless node_connected?(node) do
do_connect_node(node)
do_connect_node(node, self())

receive do
:connected ->
:ok
receive do
:connected ->
:ok

{:error, reason} ->
Logger.warning("First connection attempt failed for #{inspect(reason)}",
node: Base.encode16(first_public_key)
)
end
{:error, reason} ->
Logger.warning("First connection attempt failed for #{inspect(reason)}",
node: Base.encode16(first_public_key)
)
end
end,
on_timeout: :kill_task
)
|> Stream.run()
end

defp do_connect_node(%Node{
ip: ip,
port: port,
transport: transport,
first_public_key: first_public_key
}) do
defp do_connect_node(
%Node{
ip: ip,
port: port,
transport: transport,
first_public_key: first_public_key
},
from \\ nil
) do
if first_public_key == Crypto.first_node_public_key() do
:ok
else
{:ok, _pid} = Client.new_connection(ip, port, transport, first_public_key)
{:ok, _pid} = Client.new_connection(ip, port, transport, first_public_key, from)
:ok
end
end
Expand Down
7 changes: 4 additions & 3 deletions lib/archethic/p2p/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ defmodule Archethic.P2P.Client do
use Knigge, otp_app: :archethic, default: DefaultImpl

@callback new_connection(
:inet.ip_address(),
ip :: :inet.ip_address(),
port :: :inet.port_number(),
P2P.supported_transport(),
Crypto.key()
transport :: P2P.supported_transport(),
node_first_public_key :: Crypto.key(),
from :: pid() | nil
) :: Supervisor.on_start()

@callback send_message(
Expand Down
1 change: 0 additions & 1 deletion lib/archethic/p2p/client/connection/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ defmodule Archethic.P2P.Client.ConnectionSupervisor do

def add_connection(opts \\ []) do
node_public_key = Keyword.get(opts, :node_public_key)
opts = Keyword.put(opts, :from, self())

DynamicSupervisor.start_child(
__MODULE__,
Expand Down
40 changes: 28 additions & 12 deletions lib/archethic/p2p/client/default_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,53 @@ defmodule Archethic.P2P.Client.DefaultImpl do
"""
@impl Client
@spec new_connection(
:inet.ip_address(),
ip :: :inet.ip_address(),
port :: :inet.port_number(),
P2P.supported_transport(),
Crypto.key()
transport :: P2P.supported_transport(),
node_first_public_key :: Crypto.key(),
from :: pid() | nil
) :: Supervisor.on_start()
def new_connection(ip, port, transport, node_public_key) do
def new_connection(ip, port, transport, node_public_key, from) do
case ConnectionSupervisor.add_connection(
transport: transport_mod(transport),
ip: ip,
port: port,
node_public_key: node_public_key
node_public_key: node_public_key,
from: from
) do
{:ok, pid} ->
{:ok, pid}

{:error, {:already_started, pid}} ->
# restart the connection if informations are updated
{_state, %{ip: conn_ip, port: conn_port, transport: conn_transport}} = :sys.get_state(pid)
{_state, current_conn} = :sys.get_state(pid)

with true <- conn_ip == ip,
true <- conn_port == port,
true <- conn_transport == transport_mod(transport) do
{:ok, pid}
with false <- connected?(node_public_key),
true <- informations_changed?(ip, port, transport, current_conn) do
ConnectionSupervisor.cancel_connection(pid, node_public_key)
new_connection(ip, port, transport, node_public_key, from)
else
_ ->
ConnectionSupervisor.cancel_connection(pid, node_public_key)
new_connection(ip, port, transport, node_public_key)
{:ok, pid}
end
end
end

defp informations_changed?(ip, port, transport, %{
ip: conn_ip,
port: conn_port,
transport: conn_transport
}) do
with true <- conn_ip == ip,
true <- conn_port == port,
true <- conn_transport == transport_mod(transport) do
false
else
_ ->
true
end
end

defp transport_mod(:tcp), do: TCPImpl
defp transport_mod(other), do: other

Expand Down
25 changes: 6 additions & 19 deletions lib/archethic/p2p/mem_table.ex
Original file line number Diff line number Diff line change
Expand Up @@ -283,27 +283,14 @@ defmodule Archethic.P2P.MemTable do
{Keyword.fetch!(@discovery_index_position, :last_public_key), last_public_key},
{Keyword.fetch!(@discovery_index_position, :reward_address), reward_address},
{Keyword.fetch!(@discovery_index_position, :last_address), last_address},
{Keyword.fetch!(@discovery_index_position, :origin_public_key), origin_public_key}
{Keyword.fetch!(@discovery_index_position, :origin_public_key), origin_public_key},
{Keyword.fetch!(@discovery_index_position, :ip), ip},
{Keyword.fetch!(@discovery_index_position, :port), port},
{Keyword.fetch!(@discovery_index_position, :http_port), http_port},
{Keyword.fetch!(@discovery_index_position, :transport), transport},
{Keyword.fetch!(@discovery_index_position, :last_update_date), timestamp}
]

# We change connection informations only if these infos are newer than the actual ones
timestamp_pos = Keyword.fetch!(@discovery_index_position, :last_update_date)
last_update_date = :ets.lookup_element(@discovery_table, first_public_key, timestamp_pos)

changes =
if DateTime.compare(timestamp, last_update_date) != :lt do
changes ++
[
{Keyword.fetch!(@discovery_index_position, :ip), ip},
{Keyword.fetch!(@discovery_index_position, :port), port},
{Keyword.fetch!(@discovery_index_position, :http_port), http_port},
{Keyword.fetch!(@discovery_index_position, :transport), transport},
{timestamp_pos, timestamp}
]
else
changes
end

changes =
if geo_patch != nil do
[{Keyword.fetch!(@discovery_index_position, :geo_patch), geo_patch} | changes]
Expand Down
9 changes: 0 additions & 9 deletions lib/archethic/p2p/mem_table_loader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ defmodule Archethic.P2P.MemTableLoader do

alias Archethic.DB

alias Archethic.P2P.Client
alias Archethic.P2P.GeoPatch
alias Archethic.P2P.MemTable
alias Archethic.P2P.Node
Expand Down Expand Up @@ -148,14 +147,6 @@ defmodule Archethic.P2P.MemTableLoader do
end

Logger.info("Node loaded into in memory p2p tables", node: Base.encode16(first_public_key))

if first_public_key != Crypto.first_node_public_key() do
{:ok, %Node{ip: ip, port: port, transport: transport}} = MemTable.get_node(first_public_key)
{:ok, _pid} = Client.new_connection(ip, port, transport, first_public_key)
:ok
else
:ok
end
end

def load_transaction(%Transaction{
Expand Down
7 changes: 4 additions & 3 deletions lib/archethic/self_repair.ex
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ defmodule Archethic.SelfRepair do
try do
:ok = Sync.load_missed_transactions(last_sync_date, download_nodes)
{:halt, :ok}
catch
error, message ->
Logger.error("Error during self repair #{inspect(error)} #{inspect(message)}")
rescue
error ->
Logger.error("Error during bootstrap self repair")
Logger.error(Exception.format(:error, error, __STACKTRACE__))
{:cont, :error}
end
end)
Expand Down
40 changes: 40 additions & 0 deletions test/archethic/p2p/client/default_impl_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
defmodule Archethic.P2P.Client.DefaultImplTest do
use ArchethicCase

alias Archethic.Crypto

alias Archethic.P2P.Client.ConnectionSupervisor
alias Archethic.P2P.Client.DefaultImpl

describe "new_connection/4" do
test "should not cancel connection if already connected even if infos are updated" do
ip = {127, 0, 0, 1}
port = 3002
transport = :tcp
node_public_key = Crypto.first_node_public_key()
{:ok, pid} = DefaultImpl.new_connection(ip, port, transport, node_public_key)

# get_state to wait connection process finished
:sys.get_state(pid)
ConnectionSupervisor.set_node_connected(node_public_key)

assert {:ok, ^pid} = DefaultImpl.new_connection(ip, 3003, transport, node_public_key)
end

test "should cancel connection and create new one if node not connected and infos are updated" do
ip = {127, 0, 0, 1}
port = 3002
transport = :tcp
node_public_key = Crypto.first_node_public_key()
{:ok, pid1} = DefaultImpl.new_connection(ip, port, transport, node_public_key)

# get_state to wait connection process finished
:sys.get_state(pid1)
ConnectionSupervisor.set_node_disconnected(node_public_key)

{:ok, pid2} = DefaultImpl.new_connection(ip, 3003, transport, node_public_key)

assert pid1 != pid2
end
end
end