Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection's reconnect is asynchronous #721

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 55 additions & 21 deletions lib/archethic/p2p/client/connection.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
defmodule Archethic.P2P.Client.Connection do
@moduledoc false
@moduledoc """

3 states:
:initializing
{:connected, socket}
:disconnected

we use the :initializing state to be able to postpone calls and casts until after the 1 connect attempt
"""

alias Archethic.Crypto

Expand Down Expand Up @@ -77,8 +85,17 @@ defmodule Archethic.P2P.Client.Connection do
availability_timer: {nil, 0}
}

actions = [{:next_event, :internal, :connect}]
{:ok, :disconnected, data, actions}
{:ok, :initializing, data, [{:next_event, :internal, :connect}]}
end

# every messages sent while inializing will wait until state changes
def handle_event({:call, _}, _action, :initializing, _data) do
{:keep_state_and_data, :postpone}
end

# every messages sent while inializing will wait until state changes
def handle_event(:cast, _action, :initializing, _data) do
{:keep_state_and_data, :postpone}
end

def handle_event(
Expand Down Expand Up @@ -121,8 +138,6 @@ defmodule Archethic.P2P.Client.Connection do
end)}
end

def handle_event(:enter, :disconnected, :disconnected, _data), do: :keep_state_and_data

def handle_event(
:enter,
{:connected, _socket},
Expand Down Expand Up @@ -161,7 +176,7 @@ defmodule Archethic.P2P.Client.Connection do

def handle_event(
:enter,
:disconnected,
_,
{:connected, _socket},
data = %{node_public_key: node_public_key}
) do
Expand All @@ -180,37 +195,41 @@ defmodule Archethic.P2P.Client.Connection do
{:keep_state, new_data}
end

def handle_event(:enter, _old_state, :initializing, _data), do: :keep_state_and_data
def handle_event(:enter, _old_state, :disconnected, _data), do: :keep_state_and_data
def handle_event(:enter, _old_state, {:connected, _socket}, _data), do: :keep_state_and_data

# called from the :disconnected or :initializing state
def handle_event(
:internal,
:connect,
:disconnected,
data = %{
_state,
_data = %{
ip: ip,
port: port,
transport: transport
}
) do
case transport.handle_connect(ip, port) do
{:ok, socket} ->
{:next_state, {:connected, socket}, data}
# try to connect asynchronously so it does not block the messages coming in
# Task.async/1 will send a {:info, {ref, result}} message to the connection process
Task.async(fn ->
transport.handle_connect(ip, port)
end)

{:error, _} ->
actions = [{{:timeout, :reconnect}, 500, nil}]
{:keep_state_and_data, actions}
end
end

def handle_event({:timeout, :reconnect}, _event_data, :disconnected, _data) do
actions = [{:next_event, :internal, :connect}]
{:keep_state_and_data, actions}
:keep_state_and_data
end

# this message is used to delay next connection attempt
def handle_event({:timeout, :reconnect}, _event_data, {:connected, _socket}, _data) do
:keep_state_and_data
end

# this message is used to delay next connection attempt
def handle_event({:timeout, :reconnect}, _event_data, _state, _data) do
actions = [{:next_event, :internal, :connect}]
{:keep_state_and_data, actions}
end

def handle_event(
:cast,
{:send_message, ref, from, _msg, _timeout},
Expand Down Expand Up @@ -364,11 +383,21 @@ defmodule Archethic.P2P.Client.Connection do
end
end

# Task.async tells us the process ended successfully
def handle_event(:info, {:DOWN, _ref, :process, _pid, :normal}, _, _data) do
:keep_state_and_data
end

def handle_event(:info, _event, :disconnected, _data), do: :keep_state_and_data
# Task.async sending us the result of the handle_connect
def handle_event(:info, {_ref, {:ok, socket}}, _, data) do
{:next_state, {:connected, socket}, data}
end

# Task.async sending us the result of the handle_connect
def handle_event(:info, {_ref, {:error, _reason}}, _, data) do
actions = [{{:timeout, :reconnect}, 500, nil}]
{:next_state, :disconnected, data, actions}
end

def handle_event(
:info,
Expand Down Expand Up @@ -444,5 +473,10 @@ defmodule Archethic.P2P.Client.Connection do
end
end

def handle_event(:info, _, _, _data) do
# Unhandled message received
:keep_state_and_data
end

def code_change(_old_vsn, state, data, _extra), do: {:ok, state, data}
end
126 changes: 126 additions & 0 deletions test/archethic/p2p/client/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ defmodule Archethic.P2P.Client.ConnectionTest do
node_public_key: "key1"
)

assert {:initializing, _} = :sys.get_state(pid)

Process.sleep(10)

assert {{:connected, _socket}, %{request_id: 0, messages: %{}}} = :sys.get_state(pid)
end

Expand Down Expand Up @@ -45,6 +49,122 @@ defmodule Archethic.P2P.Client.ConnectionTest do
}} = :sys.get_state(pid)
end

test "should get an error, :closed when trying to reach an unreachable node" do
defmodule MockTransportUnreachable do
alias Archethic.P2P.Client.Transport

@behaviour Transport

def handle_connect(_ip, _port) do
{:error, :timeout}
bchamagne marked this conversation as resolved.
Show resolved Hide resolved
end

def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok

def handle_message({_, _, _}), do: {:error, :closed}
end

{:ok, _} =
Connection.start_link(
transport: MockTransportUnreachable,
ip: {127, 0, 0, 2},
port: 3000,
node_public_key: Crypto.first_node_public_key()
)

assert {:error, :closed} =
Connection.send_message(
Crypto.first_node_public_key(),
%GetBalance{address: <<0::8, :crypto.strong_rand_bytes(32)::binary>>}
)
end

test "reconnection should be asynchronous" do
defmodule MockTransportConnectionTimeout do
alias Archethic.P2P.Client.Transport

@behaviour Transport

def handle_connect({127, 0, 0, 1}, _port) do
{:error, :timeout}
end

def handle_connect({127, 0, 0, 2}, _port) do
Process.sleep(100_000)
{:error, :timeout}
end

def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok

def handle_message({_, _, _}), do: {:error, :closed}
end

{:ok, pid} =
Connection.start_link(
transport: MockTransportConnectionTimeout,
ip: {127, 0, 0, 1},
port: 3000,
node_public_key: Crypto.first_node_public_key()
)

:sys.replace_state(pid, fn {state, data} ->
{state, Map.put(data, :ip, {127, 0, 0, 2})}
end)

# 500ms to wait for the 1st reconnect attempt
Process.sleep(550)

time = System.monotonic_time(:millisecond)

assert {:error, :closed} =
Connection.send_message(
Crypto.first_node_public_key(),
%GetBalance{address: <<0::8, :crypto.strong_rand_bytes(32)::binary>>},
200
)

# ensure there was no delay
time2 = System.monotonic_time(:millisecond)
assert time2 - time < 100
end

test "should be in :connected state after reconnection" do
defmodule MockTransportReconnectionSuccess do
alias Archethic.P2P.Client.Transport

@behaviour Transport

def handle_connect({127, 0, 0, 1}, _port) do
{:error, :timeout}
end

def handle_connect({127, 0, 0, 2}, _port) do
{:ok, make_ref()}
end

def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok

def handle_message({_, _, _}), do: {:error, :closed}
end

{:ok, pid} =
Connection.start_link(
transport: MockTransportReconnectionSuccess,
ip: {127, 0, 0, 1},
port: 3000,
node_public_key: Crypto.first_node_public_key()
)

:sys.replace_state(pid, fn {state, data} ->
{state, Map.put(data, :ip, {127, 0, 0, 2})}
end)

# 500ms to wait for the 1st reconnect attempt
Process.sleep(550)

assert {{:connected, _socket}, _} = :sys.get_state(pid)
end

test "should get an error when the timeout is reached" do
{:ok, pid} =
Connection.start_link(
Expand Down Expand Up @@ -234,6 +354,9 @@ defmodule Archethic.P2P.Client.ConnectionTest do
node_public_key: "key1"
)

assert {:initializing, _} = :sys.get_state(pid)
Process.sleep(10)

assert {{:connected, _socket}, %{availability_timer: {start, 0}}} = :sys.get_state(pid)
assert start != nil
end
Expand Down Expand Up @@ -299,6 +422,9 @@ defmodule Archethic.P2P.Client.ConnectionTest do
node_public_key: Crypto.first_node_public_key()
)

assert {:initializing, _} = :sys.get_state(pid)
Process.sleep(10)

assert {{:connected, _socket}, %{availability_timer: {start, 0}}} = :sys.get_state(pid)
assert start != nil

Expand Down