diff --git a/lib/archethic/p2p/client/connection.ex b/lib/archethic/p2p/client/connection.ex index 319c0f167..dfd67d7fb 100644 --- a/lib/archethic/p2p/client/connection.ex +++ b/lib/archethic/p2p/client/connection.ex @@ -1,5 +1,13 @@ defmodule Archethic.P2P.Client.Connection do - @moduledoc false + @moduledoc """ + + 3 states: + :initializing + {:connected, socket} + :disconnected + + we use the :initializing state to be able to postpone calls and casts until after the 1 connect attempt + """ alias Archethic.Crypto @@ -77,8 +85,17 @@ defmodule Archethic.P2P.Client.Connection do availability_timer: {nil, 0} } - actions = [{:next_event, :internal, :connect}] - {:ok, :disconnected, data, actions} + {:ok, :initializing, data, [{:next_event, :internal, :connect}]} + end + + # every messages sent while inializing will wait until state changes + def handle_event({:call, _}, _action, :initializing, _data) do + {:keep_state_and_data, :postpone} + end + + # every messages sent while inializing will wait until state changes + def handle_event(:cast, _action, :initializing, _data) do + {:keep_state_and_data, :postpone} end def handle_event( @@ -121,8 +138,6 @@ defmodule Archethic.P2P.Client.Connection do end)} end - def handle_event(:enter, :disconnected, :disconnected, _data), do: :keep_state_and_data - def handle_event( :enter, {:connected, _socket}, @@ -161,7 +176,7 @@ defmodule Archethic.P2P.Client.Connection do def handle_event( :enter, - :disconnected, + _, {:connected, _socket}, data = %{node_public_key: node_public_key} ) do @@ -180,37 +195,41 @@ defmodule Archethic.P2P.Client.Connection do {:keep_state, new_data} end + def handle_event(:enter, _old_state, :initializing, _data), do: :keep_state_and_data + def handle_event(:enter, _old_state, :disconnected, _data), do: :keep_state_and_data def handle_event(:enter, _old_state, {:connected, _socket}, _data), do: :keep_state_and_data + # called from the :disconnected or :initializing state def handle_event( :internal, :connect, - :disconnected, - data = %{ + _state, + _data = %{ ip: ip, port: port, transport: transport } ) do - case transport.handle_connect(ip, port) do - {:ok, socket} -> - {:next_state, {:connected, socket}, data} + # try to connect asynchronously so it does not block the messages coming in + # Task.async/1 will send a {:info, {ref, result}} message to the connection process + Task.async(fn -> + transport.handle_connect(ip, port) + end) - {:error, _} -> - actions = [{{:timeout, :reconnect}, 500, nil}] - {:keep_state_and_data, actions} - end - end - - def handle_event({:timeout, :reconnect}, _event_data, :disconnected, _data) do - actions = [{:next_event, :internal, :connect}] - {:keep_state_and_data, actions} + :keep_state_and_data end + # this message is used to delay next connection attempt def handle_event({:timeout, :reconnect}, _event_data, {:connected, _socket}, _data) do :keep_state_and_data end + # this message is used to delay next connection attempt + def handle_event({:timeout, :reconnect}, _event_data, _state, _data) do + actions = [{:next_event, :internal, :connect}] + {:keep_state_and_data, actions} + end + def handle_event( :cast, {:send_message, ref, from, _msg, _timeout}, @@ -364,11 +383,21 @@ defmodule Archethic.P2P.Client.Connection do end end + # Task.async tells us the process ended successfully def handle_event(:info, {:DOWN, _ref, :process, _pid, :normal}, _, _data) do :keep_state_and_data end - def handle_event(:info, _event, :disconnected, _data), do: :keep_state_and_data + # Task.async sending us the result of the handle_connect + def handle_event(:info, {_ref, {:ok, socket}}, _, data) do + {:next_state, {:connected, socket}, data} + end + + # Task.async sending us the result of the handle_connect + def handle_event(:info, {_ref, {:error, _reason}}, _, data) do + actions = [{{:timeout, :reconnect}, 500, nil}] + {:next_state, :disconnected, data, actions} + end def handle_event( :info, @@ -444,5 +473,10 @@ defmodule Archethic.P2P.Client.Connection do end end + def handle_event(:info, _, _, _data) do + # Unhandled message received + :keep_state_and_data + end + def code_change(_old_vsn, state, data, _extra), do: {:ok, state, data} end diff --git a/test/archethic/p2p/client/connection_test.exs b/test/archethic/p2p/client/connection_test.exs index e28de8b66..4513ae49b 100644 --- a/test/archethic/p2p/client/connection_test.exs +++ b/test/archethic/p2p/client/connection_test.exs @@ -17,6 +17,10 @@ defmodule Archethic.P2P.Client.ConnectionTest do node_public_key: "key1" ) + assert {:initializing, _} = :sys.get_state(pid) + + Process.sleep(10) + assert {{:connected, _socket}, %{request_id: 0, messages: %{}}} = :sys.get_state(pid) end @@ -45,6 +49,122 @@ defmodule Archethic.P2P.Client.ConnectionTest do }} = :sys.get_state(pid) end + test "should get an error, :closed when trying to reach an unreachable node" do + defmodule MockTransportUnreachable do + alias Archethic.P2P.Client.Transport + + @behaviour Transport + + def handle_connect(_ip, _port) do + {:error, :timeout} + end + + def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok + + def handle_message({_, _, _}), do: {:error, :closed} + end + + {:ok, _} = + Connection.start_link( + transport: MockTransportUnreachable, + ip: {127, 0, 0, 2}, + port: 3000, + node_public_key: Crypto.first_node_public_key() + ) + + assert {:error, :closed} = + Connection.send_message( + Crypto.first_node_public_key(), + %GetBalance{address: <<0::8, :crypto.strong_rand_bytes(32)::binary>>} + ) + end + + test "reconnection should be asynchronous" do + defmodule MockTransportConnectionTimeout do + alias Archethic.P2P.Client.Transport + + @behaviour Transport + + def handle_connect({127, 0, 0, 1}, _port) do + {:error, :timeout} + end + + def handle_connect({127, 0, 0, 2}, _port) do + Process.sleep(100_000) + {:error, :timeout} + end + + def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok + + def handle_message({_, _, _}), do: {:error, :closed} + end + + {:ok, pid} = + Connection.start_link( + transport: MockTransportConnectionTimeout, + ip: {127, 0, 0, 1}, + port: 3000, + node_public_key: Crypto.first_node_public_key() + ) + + :sys.replace_state(pid, fn {state, data} -> + {state, Map.put(data, :ip, {127, 0, 0, 2})} + end) + + # 500ms to wait for the 1st reconnect attempt + Process.sleep(550) + + time = System.monotonic_time(:millisecond) + + assert {:error, :closed} = + Connection.send_message( + Crypto.first_node_public_key(), + %GetBalance{address: <<0::8, :crypto.strong_rand_bytes(32)::binary>>}, + 200 + ) + + # ensure there was no delay + time2 = System.monotonic_time(:millisecond) + assert time2 - time < 100 + end + + test "should be in :connected state after reconnection" do + defmodule MockTransportReconnectionSuccess do + alias Archethic.P2P.Client.Transport + + @behaviour Transport + + def handle_connect({127, 0, 0, 1}, _port) do + {:error, :timeout} + end + + def handle_connect({127, 0, 0, 2}, _port) do + {:ok, make_ref()} + end + + def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok + + def handle_message({_, _, _}), do: {:error, :closed} + end + + {:ok, pid} = + Connection.start_link( + transport: MockTransportReconnectionSuccess, + ip: {127, 0, 0, 1}, + port: 3000, + node_public_key: Crypto.first_node_public_key() + ) + + :sys.replace_state(pid, fn {state, data} -> + {state, Map.put(data, :ip, {127, 0, 0, 2})} + end) + + # 500ms to wait for the 1st reconnect attempt + Process.sleep(550) + + assert {{:connected, _socket}, _} = :sys.get_state(pid) + end + test "should get an error when the timeout is reached" do {:ok, pid} = Connection.start_link( @@ -234,6 +354,9 @@ defmodule Archethic.P2P.Client.ConnectionTest do node_public_key: "key1" ) + assert {:initializing, _} = :sys.get_state(pid) + Process.sleep(10) + assert {{:connected, _socket}, %{availability_timer: {start, 0}}} = :sys.get_state(pid) assert start != nil end @@ -299,6 +422,9 @@ defmodule Archethic.P2P.Client.ConnectionTest do node_public_key: Crypto.first_node_public_key() ) + assert {:initializing, _} = :sys.get_state(pid) + Process.sleep(10) + assert {{:connected, _socket}, %{availability_timer: {start, 0}}} = :sys.get_state(pid) assert start != nil