From 46f11220c84fe717eaa593916b57b4792dd4b58d Mon Sep 17 00:00:00 2001 From: Neylix Date: Tue, 23 Aug 2022 18:58:17 +0200 Subject: [PATCH] Fix bootstrap issues (#537) * Remove update connection when receiving a message * Fix self repair error when having more dates to synchronize than CPU core * Start listener after bootstrap --- lib/archethic/beacon_chain.ex | 27 ++++++++++++----- lib/archethic/bootstrap.ex | 2 ++ lib/archethic/p2p/client.ex | 2 -- lib/archethic/p2p/client/connection.ex | 29 +++--------------- lib/archethic/p2p/client/default_impl.ex | 11 ------- lib/archethic/p2p/listener.ex | 10 ++++++- lib/archethic/p2p/listener_protocol.ex | 38 ++++++++++-------------- 7 files changed, 51 insertions(+), 68 deletions(-) diff --git a/lib/archethic/beacon_chain.ex b/lib/archethic/beacon_chain.ex index 5684887e9..d107f7ba4 100644 --- a/lib/archethic/beacon_chain.ex +++ b/lib/archethic/beacon_chain.ex @@ -298,7 +298,7 @@ defmodule Archethic.BeaconChain do authorized_nodes = P2P.authorized_and_available_nodes() list_subsets() - |> Flow.from_enumerable() + |> Flow.from_enumerable(stages: 256) |> Flow.flat_map(fn subset -> # Foreach subset and date we compute concurrently the node election dates @@ -316,15 +316,22 @@ defmodule Archethic.BeaconChain do fetch_summaries(node, addresses) end) # We repartition by summary time to aggregate summaries for a date - |> Flow.partition(key: {:key, :summary_time}) + |> Flow.partition(stages: System.schedulers_online() * 4, key: {:key, :summary_time}) |> Flow.reduce( - fn -> %SummaryAggregate{} end, - &SummaryAggregate.add_summary(&2, &1) + fn -> %{} end, + fn summary = %Summary{summary_time: time}, acc -> + Map.update( + acc, + time, + %SummaryAggregate{summary_time: time} |> SummaryAggregate.add_summary(summary), + &SummaryAggregate.add_summary(&1, summary) + ) + end ) - |> Flow.emit(:state) + |> Flow.on_trigger(&{Map.values(&1), &1}) |> Stream.reject(&SummaryAggregate.empty?/1) |> Stream.map(&SummaryAggregate.aggregate/1) - |> Enum.sort_by(& &1.summary_time) + |> Enum.sort_by(& &1.summary_time, {:asc, DateTime}) end defp get_summary_address_by_node(date, subset, authorized_nodes) do @@ -341,9 +348,15 @@ defmodule Archethic.BeaconChain do end defp fetch_summaries(node, addresses) do + Logger.info( + "Self repair start download #{Enum.count(addresses)} summaries on node #{Base.encode16(node.first_public_key)}" + ) + addresses |> Stream.chunk_every(10) - |> Stream.flat_map(&batch_summaries_fetching(&1, node)) + |> Task.async_stream(&batch_summaries_fetching(&1, node)) + |> Stream.filter(&match?({:ok, _}, &1)) + |> Stream.flat_map(&elem(&1, 1)) |> Enum.to_list() end diff --git a/lib/archethic/bootstrap.ex b/lib/archethic/bootstrap.ex index 32a890396..f724835b8 100644 --- a/lib/archethic/bootstrap.ex +++ b/lib/archethic/bootstrap.ex @@ -13,6 +13,7 @@ defmodule Archethic.Bootstrap do alias Archethic.P2P alias Archethic.P2P.Node + alias Archethic.P2P.Listener alias Archethic.SelfRepair @@ -210,6 +211,7 @@ defmodule Archethic.Bootstrap do SelfRepair.start_scheduler() :persistent_term.put(:archethic_up, :up) + Listener.listen() end defp first_initialization( diff --git a/lib/archethic/p2p/client.ex b/lib/archethic/p2p/client.ex index 4f11a0fc3..beddd1e49 100644 --- a/lib/archethic/p2p/client.ex +++ b/lib/archethic/p2p/client.ex @@ -22,6 +22,4 @@ defmodule Archethic.P2P.Client do {:ok, Message.response()} | {:error, :timeout} | {:error, :closed} - - @callback set_connected(Crypto.key()) :: :ok end diff --git a/lib/archethic/p2p/client/connection.ex b/lib/archethic/p2p/client/connection.ex index 2d38102a3..62f1fddf7 100644 --- a/lib/archethic/p2p/client/connection.ex +++ b/lib/archethic/p2p/client/connection.ex @@ -48,22 +48,6 @@ defmodule Archethic.P2P.Client.Connection do end end - @doc """ - Set the sate to connected - """ - @spec set_connected(Crypto.key()) :: :ok - def set_connected(public_key) do - GenStateMachine.cast(via_tuple(public_key), {:connect, self()}) - - receive do - :ok -> - :ok - after - 1000 -> - :ok - end - end - # fetch cnnoection details from registery for a node from its public key defp via_tuple(public_key), do: {:via, Registry, {ConnectionRegistry, public_key}} @@ -96,6 +80,8 @@ defmodule Archethic.P2P.Client.Connection do ) do Logger.warning("Connection closed", node: Base.encode16(node_public_key)) + MemTable.decrease_node_availability(node_public_key) + # Notify clients the connection is lost # and cancel the existing timeouts actions = @@ -118,21 +104,14 @@ defmodule Archethic.P2P.Client.Connection do data = %{ ip: ip, port: port, - transport: transport, - node_public_key: node_public_key + transport: transport } ) do case transport.handle_connect(ip, port) do {:ok, socket} -> {:next_state, {:connected, socket}, data} - {:error, reason} -> - Logger.debug( - "Error during node connection to #{:inet.ntoa(ip)}:#{port} - #{reason} ", - node: Base.encode16(node_public_key) - ) - - MemTable.decrease_node_availability(node_public_key) + {:error, _} -> actions = [{{:timeout, :reconnect}, 500, nil}] {:keep_state_and_data, actions} end diff --git a/lib/archethic/p2p/client/default_impl.ex b/lib/archethic/p2p/client/default_impl.ex index efcfa1fe6..d515b77df 100644 --- a/lib/archethic/p2p/client/default_impl.ex +++ b/lib/archethic/p2p/client/default_impl.ex @@ -8,7 +8,6 @@ defmodule Archethic.P2P.Client.DefaultImpl do alias Archethic.P2P.Client.Connection alias Archethic.P2P.Client.ConnectionSupervisor alias Archethic.P2P.Client.Transport.TCPImpl - alias Archethic.P2P.MemTable alias Archethic.P2P.Message alias Archethic.P2P.Node @@ -62,20 +61,10 @@ defmodule Archethic.P2P.Client.DefaultImpl do node: Base.encode16(node_public_key) ) - MemTable.decrease_node_availability(node_public_key) BeaconUpdate.unsubscribe(node_public_key) {:error, reason} end end end - - @doc """ - When receiving a message from a node, we set it connected - """ - @impl Client - @spec set_connected(Crypto.key()) :: :ok - def set_connected(node_public_key) do - Connection.set_connected(node_public_key) - end end diff --git a/lib/archethic/p2p/listener.ex b/lib/archethic/p2p/listener.ex index cd4a538eb..8db5fef11 100644 --- a/lib/archethic/p2p/listener.ex +++ b/lib/archethic/p2p/listener.ex @@ -15,6 +15,14 @@ defmodule Archethic.P2P.Listener do transport = Keyword.get(opts, :transport) port = Keyword.get(opts, :port) + {:ok, {transport, port}} + end + + def listen() do + GenServer.cast(__MODULE__, :start_listener) + end + + def handle_cast(:start_listener, {transport, port}) do ranch_transport = case transport do :tcp -> @@ -34,7 +42,7 @@ defmodule Archethic.P2P.Listener do {:ok, listener_pid} -> Logger.info("P2P #{transport} Endpoint running on port #{port}") - {:ok, %{listener_pid: listener_pid}} + {:noreply, %{listener_pid: listener_pid}} {:error, :eaddrinuse} -> Logger.error( diff --git a/lib/archethic/p2p/listener_protocol.ex b/lib/archethic/p2p/listener_protocol.ex index defceed96..47b7ca8f6 100644 --- a/lib/archethic/p2p/listener_protocol.ex +++ b/lib/archethic/p2p/listener_protocol.ex @@ -17,22 +17,17 @@ defmodule Archethic.P2P.ListenerProtocol do end def init({ref, transport, opts}) do - if :persistent_term.get(:archethic_up, nil) do - {:ok, socket} = :ranch.handshake(ref) - :ok = transport.setopts(socket, opts) + {:ok, socket} = :ranch.handshake(ref) + :ok = transport.setopts(socket, opts) - {:ok, {ip, port}} = :inet.peername(socket) + {:ok, {ip, port}} = :inet.peername(socket) - :gen_server.enter_loop(__MODULE__, [], %{ - socket: socket, - transport: transport, - ip: ip, - port: port - }) - else - # node is Bootstrapping, reject any incoming request messages - {:stop, :node_bootstrapping} - end + :gen_server.enter_loop(__MODULE__, [], %{ + socket: socket, + transport: transport, + ip: ip, + port: port + }) end def handle_info( @@ -41,16 +36,15 @@ defmodule Archethic.P2P.ListenerProtocol do ) do :inet.setopts(socket, active: :once) - %Archethic.P2P.MessageEnvelop{ - message_id: message_id, - message: message, - sender_public_key: sender_public_key - } = Archethic.P2P.MessageEnvelop.decode(msg) + Task.Supervisor.start_child(TaskSupervisor, fn -> + %Archethic.P2P.MessageEnvelop{ + message_id: message_id, + message: message, + sender_public_key: sender_public_key + } = Archethic.P2P.MessageEnvelop.decode(msg) - Archethic.P2P.MemTable.increase_node_availability(sender_public_key) - Archethic.P2P.Client.set_connected(sender_public_key) + Archethic.P2P.MemTable.increase_node_availability(sender_public_key) - Task.Supervisor.start_child(TaskSupervisor, fn -> response = Archethic.P2P.Message.process(message) encoded_response =