Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor P2P.connected? into Connection module #1100

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions lib/archethic/p2p/client/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ defmodule Archethic.P2P.Client.Connection do
alias Archethic.Crypto

alias Archethic.P2P.Client.ConnectionRegistry
alias Archethic.P2P.Client.ConnectionSupervisor

alias Archethic.P2P.Message
alias Archethic.P2P.MessageEnvelop
Expand All @@ -23,6 +22,7 @@ defmodule Archethic.P2P.Client.Connection do

use GenStateMachine, callback_mode: [:handle_event_function, :state_enter], restart: :temporary
@vsn Mix.Project.config()[:version]
@table_name :connection_status

@doc """
Starts a new connection
Expand Down Expand Up @@ -67,6 +67,23 @@ defmodule Archethic.P2P.Client.Connection do
GenStateMachine.call(via_tuple(public_key), {:get_timer, reset?})
end

@doc """
Return true if the connection is established
"""
@spec connected?(node_public_key :: Crypto.key()) :: boolean()
def connected?(node_public_key) do
case :ets.lookup(@table_name, node_public_key) do
[{_key, connected?}] -> connected?
_ -> false
end
end

defp set_node_connected(node_public_key),
do: :ets.insert(@table_name, {node_public_key, true})

defp set_node_disconnected(node_public_key),
do: :ets.insert(@table_name, {node_public_key, false})

# fetch connection details from registery for a node from its public key
defp via_tuple(public_key), do: {:via, Registry, {ConnectionRegistry, public_key}}

Expand Down Expand Up @@ -149,7 +166,7 @@ defmodule Archethic.P2P.Client.Connection do
) do
Logger.warning("Connection closed", node: Base.encode16(node_public_key))

ConnectionSupervisor.set_node_disconnected(node_public_key)
set_node_disconnected(node_public_key)

# Stop availability timer
new_data =
Expand Down Expand Up @@ -183,7 +200,7 @@ defmodule Archethic.P2P.Client.Connection do
{:connected, _socket},
data = %{node_public_key: node_public_key}
) do
ConnectionSupervisor.set_node_connected(node_public_key)
set_node_connected(node_public_key)

# Start availability timer
new_data =
Expand Down Expand Up @@ -344,7 +361,7 @@ defmodule Archethic.P2P.Client.Connection do
message_id: msg_id
)

ConnectionSupervisor.set_node_disconnected(node_public_key)
set_node_disconnected(node_public_key)

# Stop availability timer
new_data =
Expand Down Expand Up @@ -451,7 +468,7 @@ defmodule Archethic.P2P.Client.Connection do
{:next_state, :disconnected, data}

{:ok, msg} ->
ConnectionSupervisor.set_node_connected(node_public_key)
set_node_connected(node_public_key)

# Start availability timer
new_data =
Expand Down Expand Up @@ -518,18 +535,22 @@ defmodule Archethic.P2P.Client.Connection do
:keep_state_and_data
end

def terminate(_, _, %{node_public_key: node_public_key}) do
:ets.delete(@table_name, node_public_key)
end

def code_change(
"1.1.1",
state = {:connected, _},
data = %{node_public_key: node_public_key},
_extra
) do
ConnectionSupervisor.set_node_connected(node_public_key)
set_node_connected(node_public_key)
{:ok, state, data}
end

def code_change("1.1.1", state, data = %{node_public_key: node_public_key}, _extra) do
ConnectionSupervisor.set_node_disconnected(node_public_key)
set_node_disconnected(node_public_key)
{:ok, state, data}
end

Expand Down
42 changes: 2 additions & 40 deletions lib/archethic/p2p/client/connection/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,13 @@ defmodule Archethic.P2P.Client.ConnectionSupervisor do

use DynamicSupervisor

alias Archethic.Crypto

alias Archethic.P2P.Client.Connection

@table_name :connection_status

def start_link(arg \\ []) do
DynamicSupervisor.start_link(__MODULE__, arg, name: __MODULE__)
end

def init(_) do
# Start an ets table to manage node connection status.
# This reduce the number of message sent to the Connection GenServer
if :ets.whereis(@table_name) == :undefined do
# Create ets table only if it doesn't exist (init of supervisor called in hot reload)
:ets.new(@table_name, [:named_table, :public, read_concurrency: true])
end

DynamicSupervisor.init(strategy: :one_for_one)
end

Expand All @@ -40,36 +29,9 @@ defmodule Archethic.P2P.Client.ConnectionSupervisor do
@doc """
Terminate a connection process
"""
@spec cancel_connection(pid :: pid(), node_public_key :: Crypto.key()) ::
@spec cancel_connection(pid :: pid()) ::
:ok | {:error, :not_found}
def cancel_connection(pid, node_public_key) do
:ets.delete(@table_name, node_public_key)

def cancel_connection(pid) do
DynamicSupervisor.terminate_child(__MODULE__, pid)
end

@doc """
Return true if the connection is established
"""
@spec node_connected?(node_public_key :: Crypto.key()) :: boolean()
def node_connected?(node_public_key) do
case :ets.lookup(@table_name, node_public_key) do
[{_key, connected?}] -> connected?
_ -> false
end
end

@doc """
Set node connection status to connected
"""
@spec set_node_connected(node_public_key :: Crypto.key()) :: boolean()
def set_node_connected(node_public_key),
do: :ets.insert(@table_name, {node_public_key, true})

@doc """
Set node connection status to disconnected
"""
@spec set_node_disconnected(node_public_key :: Crypto.key()) :: boolean()
def set_node_disconnected(node_public_key),
do: :ets.insert(@table_name, {node_public_key, false})
end
4 changes: 2 additions & 2 deletions lib/archethic/p2p/client/default_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ defmodule Archethic.P2P.Client.DefaultImpl do

with false <- connected?(node_public_key),
true <- informations_changed?(ip, port, transport, current_conn) do
ConnectionSupervisor.cancel_connection(pid, node_public_key)
ConnectionSupervisor.cancel_connection(pid)
new_connection(ip, port, transport, node_public_key, from)
else
_ ->
Expand Down Expand Up @@ -109,5 +109,5 @@ defmodule Archethic.P2P.Client.DefaultImpl do
defdelegate get_availability_timer(public_key, reset?), to: Connection

@impl Client
defdelegate connected?(public_key), to: ConnectionSupervisor, as: :node_connected?
defdelegate connected?(public_key), to: Connection
end
9 changes: 9 additions & 0 deletions lib/archethic/p2p/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,20 @@ defmodule Archethic.P2P.Supervisor do

use Supervisor

@table_name :connection_status
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you should use a function in the Connection module to give the name of the table.


def start_link(args \\ []) do
Supervisor.start_link(__MODULE__, args, name: Archethic.P2PSupervisor)
end

def init(_args) do
# Start an ets table to manage node connection status.
# This reduce the number of message sent to the Connection GenServer
if :ets.whereis(@table_name) == :undefined do
Copy link
Member

@samuelmanzanera samuelmanzanera Jun 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to be cleaner, the best would be use a dedicated to connection information or status and might manage its own state and lifecycle.

# Create ets table only if it doesn't exist (init of supervisor called in hot reload)
:ets.new(@table_name, [:named_table, :public, read_concurrency: true])
end

optional_children = [
{Registry, name: ConnectionRegistry, keys: :unique},
ConnectionSupervisor,
Expand Down
3 changes: 0 additions & 3 deletions test/archethic/beacon_chain/subset/p2p_sampling_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ defmodule Archethic.BeaconChain.Subset.P2PSamplingTest do
node_availability_time = [600, 500, 365, 0]

MockClient
|> stub(:new_connection, fn _, _, _, _ ->
{:ok, self()}
end)
|> expect(:send_message, fn %Node{port: 3001}, %Ping{}, _ ->
Process.sleep(10)
{:ok, %Ok{}}
Expand Down
6 changes: 3 additions & 3 deletions test/archethic/bootstrap/sync_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ defmodule Archethic.Bootstrap.SyncTest do
test_with_mock "connect_current_node/1 should request node list from the closest nodes and connect to them",
Client,
[:passthrough],
new_connection: fn _, _, _, _ ->
send(self(), :connected)
new_connection: fn _, _, _, _, from ->
send(from, :connected)
{:ok, make_ref()}
end,
connected?: fn
Expand Down Expand Up @@ -394,7 +394,7 @@ defmodule Archethic.Bootstrap.SyncTest do

# Called 3 times 2 times with P2P.connect_nodes and 1 time with P2P.quorum
assert_called_exactly(Client.connected?("key1"), 3)
assert_called(Client.new_connection(:_, :_, :_, "key2"))
assert_called(Client.new_connection(:_, :_, :_, "key2", :_))
end

test "load_storage_nonce/1 should fetch the storage nonce, decrypt it with the node key" do
Expand Down
26 changes: 23 additions & 3 deletions test/archethic/p2p/client/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,26 @@ defmodule Archethic.P2P.Client.ConnectionTest do
assert {{:connected, _socket}, %{request_id: 0, messages: %{}}} = :sys.get_state(pid)
end

test "start_link/1 should acknowledge when connection is successful" do
me = self()

{:ok, pid} =
Connection.start_link(
transport: __MODULE__.MockTransport,
ip: {127, 0, 0, 1},
port: 3000,
node_public_key: "key1",
from: me
)

assert {:initializing, _} = :sys.get_state(pid)

Process.sleep(10)

assert {{:connected, _socket}, %{request_id: 0, messages: %{}}} = :sys.get_state(pid)
assert_received :connected
end

describe "send_message/3" do
test "should send the message and enqueue the request" do
{:ok, pid} =
Expand Down Expand Up @@ -512,7 +532,7 @@ defmodule Archethic.P2P.Client.ConnectionTest do

Process.sleep(10)

assert ConnectionSupervisor.node_connected?(node_key)
assert Connection.connected?(node_key)

assert {:error, :timeout} =
Connection.send_message(
Expand All @@ -523,9 +543,9 @@ defmodule Archethic.P2P.Client.ConnectionTest do

Process.sleep(10)

refute ConnectionSupervisor.node_connected?(node_key)
refute Connection.connected?(node_key)

ConnectionSupervisor.cancel_connection(pid, node_key)
ConnectionSupervisor.cancel_connection(pid)
end
end

Expand Down
17 changes: 9 additions & 8 deletions test/archethic/p2p/client/default_impl_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,37 @@ defmodule Archethic.P2P.Client.DefaultImplTest do

alias Archethic.Crypto

alias Archethic.P2P.Client.ConnectionSupervisor
alias Archethic.P2P.Client.DefaultImpl

describe "new_connection/4" do
@table_name :connection_status

describe "new_connection/5" do
test "should not cancel connection if already connected even if infos are updated" do
ip = {127, 0, 0, 1}
port = 3002
transport = :tcp
node_public_key = Crypto.first_node_public_key()
{:ok, pid} = DefaultImpl.new_connection(ip, port, transport, node_public_key)
{:ok, pid} = DefaultImpl.new_connection(ip, port, transport, node_public_key, nil)

# get_state to wait connection process finished
:sys.get_state(pid)
ConnectionSupervisor.set_node_connected(node_public_key)
:ets.insert(@table_name, {node_public_key, true})

assert {:ok, ^pid} = DefaultImpl.new_connection(ip, 3003, transport, node_public_key)
assert {:ok, ^pid} = DefaultImpl.new_connection(ip, 3003, transport, node_public_key, nil)
end

test "should cancel connection and create new one if node not connected and infos are updated" do
ip = {127, 0, 0, 1}
port = 3002
transport = :tcp
node_public_key = Crypto.first_node_public_key()
{:ok, pid1} = DefaultImpl.new_connection(ip, port, transport, node_public_key)
{:ok, pid1} = DefaultImpl.new_connection(ip, port, transport, node_public_key, nil)

# get_state to wait connection process finished
:sys.get_state(pid1)
ConnectionSupervisor.set_node_disconnected(node_public_key)
:ets.insert(@table_name, {node_public_key, false})

{:ok, pid2} = DefaultImpl.new_connection(ip, 3003, transport, node_public_key)
{:ok, pid2} = DefaultImpl.new_connection(ip, 3003, transport, node_public_key, nil)

assert pid1 != pid2
end
Expand Down
9 changes: 7 additions & 2 deletions test/support/template.ex
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,13 @@ defmodule ArchethicCase do
end)

MockClient
|> stub(:new_connection, fn _, _, _, _ ->
{:ok, make_ref()}
|> stub(:new_connection, fn
_, _, _, _, nil ->
{:ok, self()}

_, _, _, _, from ->
send(from, :connected)
{:ok, self()}
end)
|> stub(:send_message, fn
_, %Archethic.P2P.Message.ListNodes{}, _ ->
Expand Down