diff --git a/lib/archethic/p2p/client.ex b/lib/archethic/p2p/client.ex index beddd1e49..4f11a0fc3 100644 --- a/lib/archethic/p2p/client.ex +++ b/lib/archethic/p2p/client.ex @@ -22,4 +22,6 @@ defmodule Archethic.P2P.Client do {:ok, Message.response()} | {:error, :timeout} | {:error, :closed} + + @callback set_connected(Crypto.key()) :: :ok end diff --git a/lib/archethic/p2p/client/connection.ex b/lib/archethic/p2p/client/connection.ex index ce03a92f7..743801aff 100644 --- a/lib/archethic/p2p/client/connection.ex +++ b/lib/archethic/p2p/client/connection.ex @@ -48,6 +48,22 @@ defmodule Archethic.P2P.Client.Connection do end end + @doc """ + Set the sate to connected + """ + @spec set_connected(Crypto.key()) :: :ok + def set_connected(public_key) do + GenStateMachine.cast(via_tuple(public_key), {:connect, self()}) + + receive do + :ok -> + :ok + after + 500 -> + :ok + end + end + # fetch cnnoection details from registery for a node from its public key defp via_tuple(public_key), do: {:via, Registry, {ConnectionRegistry, public_key}} @@ -127,6 +143,39 @@ defmodule Archethic.P2P.Client.Connection do {:keep_state_and_data, actions} end + def handle_event({:timeout, :reconnect}, _event_data, {:connected, _socket}, _data) do + :keep_state_and_data + end + + def handle_event( + :cast, + {:connect, from}, + state, + data = %{ + ip: ip, + port: port, + transport: transport + } + ) do + next_state = + case state do + :disconnected -> + case transport.handle_connect(ip, port) do + {:ok, socket} -> + {:next_state, {:connected, socket}, data} + + {:error, _reason} -> + :keep_state_and_data + end + + _ -> + :keep_state_and_data + end + + send(from, :ok) + next_state + end + def handle_event(:cast, {:send_message, ref, from, _msg, _timeout}, :disconnected, _data) do send(from, {ref, {:error, :closed}}) :keep_state_and_data diff --git a/lib/archethic/p2p/client/default_impl.ex b/lib/archethic/p2p/client/default_impl.ex index 716070a7f..94409a91a 100644 --- a/lib/archethic/p2p/client/default_impl.ex +++ b/lib/archethic/p2p/client/default_impl.ex @@ -66,4 +66,13 @@ defmodule Archethic.P2P.Client.DefaultImpl do end end end + + @doc """ + When receiving a message from a node, we set it connected + """ + @impl Client + @spec set_connected(Crypto.key()) :: :ok + def set_connected(node_public_key) do + Connection.set_connected(node_public_key) + end end diff --git a/lib/archethic/p2p/listener_protocol/broadway_pipeline.ex b/lib/archethic/p2p/listener_protocol/broadway_pipeline.ex index 883d2f1f2..430604512 100644 --- a/lib/archethic/p2p/listener_protocol/broadway_pipeline.ex +++ b/lib/archethic/p2p/listener_protocol/broadway_pipeline.ex @@ -7,6 +7,7 @@ defmodule Archethic.P2P.ListenerProtocol.BroadwayPipeline do alias Archethic.P2P.MemTable alias Archethic.P2P.Message alias Archethic.P2P.MessageEnvelop + alias Archethic.P2P.Client alias Broadway.Message, as: BroadwayMessage @@ -59,6 +60,7 @@ defmodule Archethic.P2P.ListenerProtocol.BroadwayPipeline do } = MessageEnvelop.decode(data) MemTable.increase_node_availability(sender_public_key) + Client.set_connected(sender_public_key) {System.monotonic_time(:millisecond), message_id, message, sender_public_key} end