From a44e0ae1b0b432163846290259221973088be7e0 Mon Sep 17 00:00:00 2001 From: Bastien CHAMAGNE Date: Mon, 28 Nov 2022 11:47:03 +0100 Subject: [PATCH 1/3] Connection's connect is now async --- lib/archethic/p2p/client/connection.ex | 44 ++++++++++++++----- test/archethic/p2p/client/connection_test.exs | 30 +++++++++++++ 2 files changed, 64 insertions(+), 10 deletions(-) diff --git a/lib/archethic/p2p/client/connection.ex b/lib/archethic/p2p/client/connection.ex index 319c0f167..9f27dc14a 100644 --- a/lib/archethic/p2p/client/connection.ex +++ b/lib/archethic/p2p/client/connection.ex @@ -77,8 +77,20 @@ defmodule Archethic.P2P.Client.Connection do availability_timer: {nil, 0} } - actions = [{:next_event, :internal, :connect}] - {:ok, :disconnected, data, actions} + # First connection attempt is synchronous (to deal with the `start_link` followed by a `send_message`) + # We call ourselves the `state_enter` because `GenStateMachine` will not do it from init + # `handle_connect` might take a long time + case transport.handle_connect(ip, port) do + {:ok, socket} -> + {_, new_data} = handle_event(:enter, :disconnected, {:connected, socket}, data) + {:ok, {:connected, socket}, new_data} + + {:error, _} -> + {_, new_data, actions} = + handle_event(:enter, {:connected, make_ref()}, :disconnected, data) + + {:ok, :disconnected, new_data, actions} + end end def handle_event( @@ -186,20 +198,21 @@ defmodule Archethic.P2P.Client.Connection do :internal, :connect, :disconnected, - data = %{ + _data = %{ ip: ip, port: port, transport: transport } ) do - case transport.handle_connect(ip, port) do - {:ok, socket} -> - {:next_state, {:connected, socket}, data} + me = self() - {:error, _} -> - actions = [{{:timeout, :reconnect}, 500, nil}] - {:keep_state_and_data, actions} - end + # try to connect asynchronously so it does not block the messages coming in + Task.start_link(fn -> + # handle_connect might take a long time + send(me, {:connect_result, transport.handle_connect(ip, port)}) + end) + + :keep_state_and_data end def handle_event({:timeout, :reconnect}, _event_data, :disconnected, _data) do @@ -364,6 +377,17 @@ defmodule Archethic.P2P.Client.Connection do end end + def handle_event(:info, {:connect_result, result}, _state, data) do + case result do + {:ok, socket} -> + {:next_state, {:connected, socket}, data} + + {:error, _} -> + actions = [{{:timeout, :reconnect}, 500, nil}] + {:keep_state_and_data, actions} + end + end + def handle_event(:info, {:DOWN, _ref, :process, _pid, :normal}, _, _data) do :keep_state_and_data end diff --git a/test/archethic/p2p/client/connection_test.exs b/test/archethic/p2p/client/connection_test.exs index e28de8b66..aa2835943 100644 --- a/test/archethic/p2p/client/connection_test.exs +++ b/test/archethic/p2p/client/connection_test.exs @@ -45,6 +45,36 @@ defmodule Archethic.P2P.Client.ConnectionTest do }} = :sys.get_state(pid) end + test "should get an error, :closed when trying to reach an unreachable node" do + defmodule MockTransportUnreachable do + alias Archethic.P2P.Client.Transport + + @behaviour Transport + + def handle_connect(_ip, _port) do + {:error, :timeout} + end + + def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok + + def handle_message({_, _, _}), do: {:error, :closed} + end + + {:ok, pid} = + Connection.start_link( + transport: MockTransportUnreachable, + ip: {127, 0, 0, 2}, + port: 3000, + node_public_key: Crypto.first_node_public_key() + ) + + assert {:error, :closed} = + Connection.send_message( + Crypto.first_node_public_key(), + %GetBalance{address: <<0::8, :crypto.strong_rand_bytes(32)::binary>>} + ) + end + test "should get an error when the timeout is reached" do {:ok, pid} = Connection.start_link( From 072fe2a4f18030a0122cef571dd44bcdf743f0d5 Mon Sep 17 00:00:00 2001 From: Bastien CHAMAGNE Date: Wed, 30 Nov 2022 16:08:19 +0100 Subject: [PATCH 2/3] Fix reconnection workflow --- lib/archethic/p2p/client/connection.ex | 39 ++++---- test/archethic/p2p/client/connection_test.exs | 88 ++++++++++++++++++- 2 files changed, 105 insertions(+), 22 deletions(-) diff --git a/lib/archethic/p2p/client/connection.ex b/lib/archethic/p2p/client/connection.ex index 9f27dc14a..98cf09264 100644 --- a/lib/archethic/p2p/client/connection.ex +++ b/lib/archethic/p2p/client/connection.ex @@ -86,10 +86,8 @@ defmodule Archethic.P2P.Client.Connection do {:ok, {:connected, socket}, new_data} {:error, _} -> - {_, new_data, actions} = - handle_event(:enter, {:connected, make_ref()}, :disconnected, data) - - {:ok, :disconnected, new_data, actions} + actions = [{{:timeout, :reconnect}, 500, nil}] + {:ok, :disconnected, data, actions} end end @@ -204,12 +202,10 @@ defmodule Archethic.P2P.Client.Connection do transport: transport } ) do - me = self() - # try to connect asynchronously so it does not block the messages coming in - Task.start_link(fn -> - # handle_connect might take a long time - send(me, {:connect_result, transport.handle_connect(ip, port)}) + # Task.async/1 will send a {:info, {ref, result}} message to the connection process + Task.async(fn -> + transport.handle_connect(ip, port) end) :keep_state_and_data @@ -377,22 +373,18 @@ defmodule Archethic.P2P.Client.Connection do end end - def handle_event(:info, {:connect_result, result}, _state, data) do - case result do - {:ok, socket} -> - {:next_state, {:connected, socket}, data} - - {:error, _} -> - actions = [{{:timeout, :reconnect}, 500, nil}] - {:keep_state_and_data, actions} - end - end - def handle_event(:info, {:DOWN, _ref, :process, _pid, :normal}, _, _data) do :keep_state_and_data end - def handle_event(:info, _event, :disconnected, _data), do: :keep_state_and_data + def handle_event(:info, {_ref, {:ok, socket}}, :disconnected, data) do + {:next_state, {:connected, socket}, data} + end + + def handle_event(:info, {_ref, {:error, _reason}}, :disconnected, _data) do + actions = [{{:timeout, :reconnect}, 500, nil}] + {:keep_state_and_date, actions} + end def handle_event( :info, @@ -468,5 +460,10 @@ defmodule Archethic.P2P.Client.Connection do end end + def handle_event(:info, _, _, _data) do + # Unhandled message received + :keep_state_and_data + end + def code_change(_old_vsn, state, data, _extra), do: {:ok, state, data} end diff --git a/test/archethic/p2p/client/connection_test.exs b/test/archethic/p2p/client/connection_test.exs index aa2835943..12b37bc30 100644 --- a/test/archethic/p2p/client/connection_test.exs +++ b/test/archethic/p2p/client/connection_test.exs @@ -60,7 +60,7 @@ defmodule Archethic.P2P.Client.ConnectionTest do def handle_message({_, _, _}), do: {:error, :closed} end - {:ok, pid} = + {:ok, _} = Connection.start_link( transport: MockTransportUnreachable, ip: {127, 0, 0, 2}, @@ -75,6 +75,92 @@ defmodule Archethic.P2P.Client.ConnectionTest do ) end + test "reconnection should be asynchronous" do + defmodule MockTransportConnectionTimeout do + alias Archethic.P2P.Client.Transport + + @behaviour Transport + + def handle_connect({127, 0, 0, 1}, _port) do + {:error, :timeout} + end + + def handle_connect({127, 0, 0, 2}, _port) do + Process.sleep(100_000) + {:error, :timeout} + end + + def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok + + def handle_message({_, _, _}), do: {:error, :closed} + end + + {:ok, pid} = + Connection.start_link( + transport: MockTransportConnectionTimeout, + ip: {127, 0, 0, 1}, + port: 3000, + node_public_key: Crypto.first_node_public_key() + ) + + :sys.replace_state(pid, fn {state, data} -> + {state, Map.put(data, :ip, {127, 0, 0, 2})} + end) + + # 500ms to wait for the 1st reconnect attempt + Process.sleep(550) + + time = System.monotonic_time(:millisecond) + + assert {:error, :closed} = + Connection.send_message( + Crypto.first_node_public_key(), + %GetBalance{address: <<0::8, :crypto.strong_rand_bytes(32)::binary>>}, + 200 + ) + + # ensure there was no delay + time2 = System.monotonic_time(:millisecond) + assert time2 - time < 100 + end + + test "should be in :connected state after reconnection" do + defmodule MockTransportReconnectionSuccess do + alias Archethic.P2P.Client.Transport + + @behaviour Transport + + def handle_connect({127, 0, 0, 1}, _port) do + {:error, :timeout} + end + + def handle_connect({127, 0, 0, 2}, _port) do + {:ok, make_ref()} + end + + def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok + + def handle_message({_, _, _}), do: {:error, :closed} + end + + {:ok, pid} = + Connection.start_link( + transport: MockTransportReconnectionSuccess, + ip: {127, 0, 0, 1}, + port: 3000, + node_public_key: Crypto.first_node_public_key() + ) + + :sys.replace_state(pid, fn {state, data} -> + {state, Map.put(data, :ip, {127, 0, 0, 2})} + end) + + # 500ms to wait for the 1st reconnect attempt + Process.sleep(550) + + assert {{:connected, _socket}, _} = :sys.get_state(pid) + end + test "should get an error when the timeout is reached" do {:ok, pid} = Connection.start_link( From 96d6f07f49189b6b856f3c545f0e556cc3fa62de Mon Sep 17 00:00:00 2001 From: Bastien CHAMAGNE Date: Fri, 2 Dec 2022 11:14:56 +0100 Subject: [PATCH 3/3] refactor the connection using the postpone feature --- lib/archethic/p2p/client/connection.ex | 63 +++++++++++-------- test/archethic/p2p/client/connection_test.exs | 10 +++ 2 files changed, 48 insertions(+), 25 deletions(-) diff --git a/lib/archethic/p2p/client/connection.ex b/lib/archethic/p2p/client/connection.ex index 98cf09264..dfd67d7fb 100644 --- a/lib/archethic/p2p/client/connection.ex +++ b/lib/archethic/p2p/client/connection.ex @@ -1,5 +1,13 @@ defmodule Archethic.P2P.Client.Connection do - @moduledoc false + @moduledoc """ + + 3 states: + :initializing + {:connected, socket} + :disconnected + + we use the :initializing state to be able to postpone calls and casts until after the 1 connect attempt + """ alias Archethic.Crypto @@ -77,18 +85,17 @@ defmodule Archethic.P2P.Client.Connection do availability_timer: {nil, 0} } - # First connection attempt is synchronous (to deal with the `start_link` followed by a `send_message`) - # We call ourselves the `state_enter` because `GenStateMachine` will not do it from init - # `handle_connect` might take a long time - case transport.handle_connect(ip, port) do - {:ok, socket} -> - {_, new_data} = handle_event(:enter, :disconnected, {:connected, socket}, data) - {:ok, {:connected, socket}, new_data} - - {:error, _} -> - actions = [{{:timeout, :reconnect}, 500, nil}] - {:ok, :disconnected, data, actions} - end + {:ok, :initializing, data, [{:next_event, :internal, :connect}]} + end + + # every messages sent while inializing will wait until state changes + def handle_event({:call, _}, _action, :initializing, _data) do + {:keep_state_and_data, :postpone} + end + + # every messages sent while inializing will wait until state changes + def handle_event(:cast, _action, :initializing, _data) do + {:keep_state_and_data, :postpone} end def handle_event( @@ -131,8 +138,6 @@ defmodule Archethic.P2P.Client.Connection do end)} end - def handle_event(:enter, :disconnected, :disconnected, _data), do: :keep_state_and_data - def handle_event( :enter, {:connected, _socket}, @@ -171,7 +176,7 @@ defmodule Archethic.P2P.Client.Connection do def handle_event( :enter, - :disconnected, + _, {:connected, _socket}, data = %{node_public_key: node_public_key} ) do @@ -190,12 +195,15 @@ defmodule Archethic.P2P.Client.Connection do {:keep_state, new_data} end + def handle_event(:enter, _old_state, :initializing, _data), do: :keep_state_and_data + def handle_event(:enter, _old_state, :disconnected, _data), do: :keep_state_and_data def handle_event(:enter, _old_state, {:connected, _socket}, _data), do: :keep_state_and_data + # called from the :disconnected or :initializing state def handle_event( :internal, :connect, - :disconnected, + _state, _data = %{ ip: ip, port: port, @@ -211,15 +219,17 @@ defmodule Archethic.P2P.Client.Connection do :keep_state_and_data end - def handle_event({:timeout, :reconnect}, _event_data, :disconnected, _data) do - actions = [{:next_event, :internal, :connect}] - {:keep_state_and_data, actions} - end - + # this message is used to delay next connection attempt def handle_event({:timeout, :reconnect}, _event_data, {:connected, _socket}, _data) do :keep_state_and_data end + # this message is used to delay next connection attempt + def handle_event({:timeout, :reconnect}, _event_data, _state, _data) do + actions = [{:next_event, :internal, :connect}] + {:keep_state_and_data, actions} + end + def handle_event( :cast, {:send_message, ref, from, _msg, _timeout}, @@ -373,17 +383,20 @@ defmodule Archethic.P2P.Client.Connection do end end + # Task.async tells us the process ended successfully def handle_event(:info, {:DOWN, _ref, :process, _pid, :normal}, _, _data) do :keep_state_and_data end - def handle_event(:info, {_ref, {:ok, socket}}, :disconnected, data) do + # Task.async sending us the result of the handle_connect + def handle_event(:info, {_ref, {:ok, socket}}, _, data) do {:next_state, {:connected, socket}, data} end - def handle_event(:info, {_ref, {:error, _reason}}, :disconnected, _data) do + # Task.async sending us the result of the handle_connect + def handle_event(:info, {_ref, {:error, _reason}}, _, data) do actions = [{{:timeout, :reconnect}, 500, nil}] - {:keep_state_and_date, actions} + {:next_state, :disconnected, data, actions} end def handle_event( diff --git a/test/archethic/p2p/client/connection_test.exs b/test/archethic/p2p/client/connection_test.exs index 12b37bc30..4513ae49b 100644 --- a/test/archethic/p2p/client/connection_test.exs +++ b/test/archethic/p2p/client/connection_test.exs @@ -17,6 +17,10 @@ defmodule Archethic.P2P.Client.ConnectionTest do node_public_key: "key1" ) + assert {:initializing, _} = :sys.get_state(pid) + + Process.sleep(10) + assert {{:connected, _socket}, %{request_id: 0, messages: %{}}} = :sys.get_state(pid) end @@ -350,6 +354,9 @@ defmodule Archethic.P2P.Client.ConnectionTest do node_public_key: "key1" ) + assert {:initializing, _} = :sys.get_state(pid) + Process.sleep(10) + assert {{:connected, _socket}, %{availability_timer: {start, 0}}} = :sys.get_state(pid) assert start != nil end @@ -415,6 +422,9 @@ defmodule Archethic.P2P.Client.ConnectionTest do node_public_key: Crypto.first_node_public_key() ) + assert {:initializing, _} = :sys.get_state(pid) + Process.sleep(10) + assert {{:connected, _socket}, %{availability_timer: {start, 0}}} = :sys.get_state(pid) assert start != nil