Skip to content

Commit

Permalink
Make P2P reconnection asynchronous (#721)
Browse files Browse the repository at this point in the history
  • Loading branch information
bchamagne committed Dec 2, 2022
1 parent 911bb4e commit 58ec61e
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 21 deletions.
76 changes: 55 additions & 21 deletions lib/archethic/p2p/client/connection.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
defmodule Archethic.P2P.Client.Connection do
@moduledoc false
@moduledoc """
3 states:
:initializing
{:connected, socket}
:disconnected
we use the :initializing state to be able to postpone calls and casts until after the 1 connect attempt
"""

alias Archethic.Crypto

Expand Down Expand Up @@ -77,8 +85,17 @@ defmodule Archethic.P2P.Client.Connection do
availability_timer: {nil, 0}
}

actions = [{:next_event, :internal, :connect}]
{:ok, :disconnected, data, actions}
{:ok, :initializing, data, [{:next_event, :internal, :connect}]}
end

# every messages sent while inializing will wait until state changes
def handle_event({:call, _}, _action, :initializing, _data) do
{:keep_state_and_data, :postpone}
end

# every messages sent while inializing will wait until state changes
def handle_event(:cast, _action, :initializing, _data) do
{:keep_state_and_data, :postpone}
end

def handle_event(
Expand Down Expand Up @@ -121,8 +138,6 @@ defmodule Archethic.P2P.Client.Connection do
end)}
end

def handle_event(:enter, :disconnected, :disconnected, _data), do: :keep_state_and_data

def handle_event(
:enter,
{:connected, _socket},
Expand Down Expand Up @@ -161,7 +176,7 @@ defmodule Archethic.P2P.Client.Connection do

def handle_event(
:enter,
:disconnected,
_,
{:connected, _socket},
data = %{node_public_key: node_public_key}
) do
Expand All @@ -180,37 +195,41 @@ defmodule Archethic.P2P.Client.Connection do
{:keep_state, new_data}
end

def handle_event(:enter, _old_state, :initializing, _data), do: :keep_state_and_data
def handle_event(:enter, _old_state, :disconnected, _data), do: :keep_state_and_data
def handle_event(:enter, _old_state, {:connected, _socket}, _data), do: :keep_state_and_data

# called from the :disconnected or :initializing state
def handle_event(
:internal,
:connect,
:disconnected,
data = %{
_state,
_data = %{
ip: ip,
port: port,
transport: transport
}
) do
case transport.handle_connect(ip, port) do
{:ok, socket} ->
{:next_state, {:connected, socket}, data}
# try to connect asynchronously so it does not block the messages coming in
# Task.async/1 will send a {:info, {ref, result}} message to the connection process
Task.async(fn ->
transport.handle_connect(ip, port)
end)

{:error, _} ->
actions = [{{:timeout, :reconnect}, 500, nil}]
{:keep_state_and_data, actions}
end
end

def handle_event({:timeout, :reconnect}, _event_data, :disconnected, _data) do
actions = [{:next_event, :internal, :connect}]
{:keep_state_and_data, actions}
:keep_state_and_data
end

# this message is used to delay next connection attempt
def handle_event({:timeout, :reconnect}, _event_data, {:connected, _socket}, _data) do
:keep_state_and_data
end

# this message is used to delay next connection attempt
def handle_event({:timeout, :reconnect}, _event_data, _state, _data) do
actions = [{:next_event, :internal, :connect}]
{:keep_state_and_data, actions}
end

def handle_event(
:cast,
{:send_message, ref, from, _msg, _timeout},
Expand Down Expand Up @@ -364,11 +383,21 @@ defmodule Archethic.P2P.Client.Connection do
end
end

# Task.async tells us the process ended successfully
def handle_event(:info, {:DOWN, _ref, :process, _pid, :normal}, _, _data) do
:keep_state_and_data
end

def handle_event(:info, _event, :disconnected, _data), do: :keep_state_and_data
# Task.async sending us the result of the handle_connect
def handle_event(:info, {_ref, {:ok, socket}}, _, data) do
{:next_state, {:connected, socket}, data}
end

# Task.async sending us the result of the handle_connect
def handle_event(:info, {_ref, {:error, _reason}}, _, data) do
actions = [{{:timeout, :reconnect}, 500, nil}]
{:next_state, :disconnected, data, actions}
end

def handle_event(
:info,
Expand Down Expand Up @@ -444,5 +473,10 @@ defmodule Archethic.P2P.Client.Connection do
end
end

def handle_event(:info, _, _, _data) do
# Unhandled message received
:keep_state_and_data
end

def code_change(_old_vsn, state, data, _extra), do: {:ok, state, data}
end
126 changes: 126 additions & 0 deletions test/archethic/p2p/client/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ defmodule Archethic.P2P.Client.ConnectionTest do
node_public_key: "key1"
)

assert {:initializing, _} = :sys.get_state(pid)

Process.sleep(10)

assert {{:connected, _socket}, %{request_id: 0, messages: %{}}} = :sys.get_state(pid)
end

Expand Down Expand Up @@ -45,6 +49,122 @@ defmodule Archethic.P2P.Client.ConnectionTest do
}} = :sys.get_state(pid)
end

test "should get an error, :closed when trying to reach an unreachable node" do
defmodule MockTransportUnreachable do
alias Archethic.P2P.Client.Transport

@behaviour Transport

def handle_connect(_ip, _port) do
{:error, :timeout}
end

def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok

def handle_message({_, _, _}), do: {:error, :closed}
end

{:ok, _} =
Connection.start_link(
transport: MockTransportUnreachable,
ip: {127, 0, 0, 2},
port: 3000,
node_public_key: Crypto.first_node_public_key()
)

assert {:error, :closed} =
Connection.send_message(
Crypto.first_node_public_key(),
%GetBalance{address: <<0::8, :crypto.strong_rand_bytes(32)::binary>>}
)
end

test "reconnection should be asynchronous" do
defmodule MockTransportConnectionTimeout do
alias Archethic.P2P.Client.Transport

@behaviour Transport

def handle_connect({127, 0, 0, 1}, _port) do
{:error, :timeout}
end

def handle_connect({127, 0, 0, 2}, _port) do
Process.sleep(100_000)
{:error, :timeout}
end

def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok

def handle_message({_, _, _}), do: {:error, :closed}
end

{:ok, pid} =
Connection.start_link(
transport: MockTransportConnectionTimeout,
ip: {127, 0, 0, 1},
port: 3000,
node_public_key: Crypto.first_node_public_key()
)

:sys.replace_state(pid, fn {state, data} ->
{state, Map.put(data, :ip, {127, 0, 0, 2})}
end)

# 500ms to wait for the 1st reconnect attempt
Process.sleep(550)

time = System.monotonic_time(:millisecond)

assert {:error, :closed} =
Connection.send_message(
Crypto.first_node_public_key(),
%GetBalance{address: <<0::8, :crypto.strong_rand_bytes(32)::binary>>},
200
)

# ensure there was no delay
time2 = System.monotonic_time(:millisecond)
assert time2 - time < 100
end

test "should be in :connected state after reconnection" do
defmodule MockTransportReconnectionSuccess do
alias Archethic.P2P.Client.Transport

@behaviour Transport

def handle_connect({127, 0, 0, 1}, _port) do
{:error, :timeout}
end

def handle_connect({127, 0, 0, 2}, _port) do
{:ok, make_ref()}
end

def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok

def handle_message({_, _, _}), do: {:error, :closed}
end

{:ok, pid} =
Connection.start_link(
transport: MockTransportReconnectionSuccess,
ip: {127, 0, 0, 1},
port: 3000,
node_public_key: Crypto.first_node_public_key()
)

:sys.replace_state(pid, fn {state, data} ->
{state, Map.put(data, :ip, {127, 0, 0, 2})}
end)

# 500ms to wait for the 1st reconnect attempt
Process.sleep(550)

assert {{:connected, _socket}, _} = :sys.get_state(pid)
end

test "should get an error when the timeout is reached" do
{:ok, pid} =
Connection.start_link(
Expand Down Expand Up @@ -234,6 +354,9 @@ defmodule Archethic.P2P.Client.ConnectionTest do
node_public_key: "key1"
)

assert {:initializing, _} = :sys.get_state(pid)
Process.sleep(10)

assert {{:connected, _socket}, %{availability_timer: {start, 0}}} = :sys.get_state(pid)
assert start != nil
end
Expand Down Expand Up @@ -299,6 +422,9 @@ defmodule Archethic.P2P.Client.ConnectionTest do
node_public_key: Crypto.first_node_public_key()
)

assert {:initializing, _} = :sys.get_state(pid)
Process.sleep(10)

assert {{:connected, _socket}, %{availability_timer: {start, 0}}} = :sys.get_state(pid)
assert start != nil

Expand Down

0 comments on commit 58ec61e

Please sign in to comment.