Skip to content

Commit

Permalink
Fix bootstrap issues (#537)
Browse files Browse the repository at this point in the history
* Remove update connection when receiving a message
* Fix self repair error when having more dates to synchronize than CPU core
* Start listener after bootstrap
  • Loading branch information
Neylix authored and Samuel committed Aug 25, 2022
1 parent c9423ee commit 46f1122
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 68 deletions.
27 changes: 20 additions & 7 deletions lib/archethic/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ defmodule Archethic.BeaconChain do
authorized_nodes = P2P.authorized_and_available_nodes()

list_subsets()
|> Flow.from_enumerable()
|> Flow.from_enumerable(stages: 256)
|> Flow.flat_map(fn subset ->
# Foreach subset and date we compute concurrently the node election
dates
Expand All @@ -316,15 +316,22 @@ defmodule Archethic.BeaconChain do
fetch_summaries(node, addresses)
end)
# We repartition by summary time to aggregate summaries for a date
|> Flow.partition(key: {:key, :summary_time})
|> Flow.partition(stages: System.schedulers_online() * 4, key: {:key, :summary_time})
|> Flow.reduce(
fn -> %SummaryAggregate{} end,
&SummaryAggregate.add_summary(&2, &1)
fn -> %{} end,
fn summary = %Summary{summary_time: time}, acc ->
Map.update(
acc,
time,
%SummaryAggregate{summary_time: time} |> SummaryAggregate.add_summary(summary),
&SummaryAggregate.add_summary(&1, summary)
)
end
)
|> Flow.emit(:state)
|> Flow.on_trigger(&{Map.values(&1), &1})
|> Stream.reject(&SummaryAggregate.empty?/1)
|> Stream.map(&SummaryAggregate.aggregate/1)
|> Enum.sort_by(& &1.summary_time)
|> Enum.sort_by(& &1.summary_time, {:asc, DateTime})
end

defp get_summary_address_by_node(date, subset, authorized_nodes) do
Expand All @@ -341,9 +348,15 @@ defmodule Archethic.BeaconChain do
end

defp fetch_summaries(node, addresses) do
Logger.info(
"Self repair start download #{Enum.count(addresses)} summaries on node #{Base.encode16(node.first_public_key)}"
)

addresses
|> Stream.chunk_every(10)
|> Stream.flat_map(&batch_summaries_fetching(&1, node))
|> Task.async_stream(&batch_summaries_fetching(&1, node))
|> Stream.filter(&match?({:ok, _}, &1))
|> Stream.flat_map(&elem(&1, 1))
|> Enum.to_list()
end

Expand Down
2 changes: 2 additions & 0 deletions lib/archethic/bootstrap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ defmodule Archethic.Bootstrap do

alias Archethic.P2P
alias Archethic.P2P.Node
alias Archethic.P2P.Listener

alias Archethic.SelfRepair

Expand Down Expand Up @@ -210,6 +211,7 @@ defmodule Archethic.Bootstrap do
SelfRepair.start_scheduler()

:persistent_term.put(:archethic_up, :up)
Listener.listen()
end

defp first_initialization(
Expand Down
2 changes: 0 additions & 2 deletions lib/archethic/p2p/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,4 @@ defmodule Archethic.P2P.Client do
{:ok, Message.response()}
| {:error, :timeout}
| {:error, :closed}

@callback set_connected(Crypto.key()) :: :ok
end
29 changes: 4 additions & 25 deletions lib/archethic/p2p/client/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,6 @@ defmodule Archethic.P2P.Client.Connection do
end
end

@doc """
Set the sate to connected
"""
@spec set_connected(Crypto.key()) :: :ok
def set_connected(public_key) do
GenStateMachine.cast(via_tuple(public_key), {:connect, self()})

receive do
:ok ->
:ok
after
1000 ->
:ok
end
end

# fetch cnnoection details from registery for a node from its public key
defp via_tuple(public_key), do: {:via, Registry, {ConnectionRegistry, public_key}}

Expand Down Expand Up @@ -96,6 +80,8 @@ defmodule Archethic.P2P.Client.Connection do
) do
Logger.warning("Connection closed", node: Base.encode16(node_public_key))

MemTable.decrease_node_availability(node_public_key)

# Notify clients the connection is lost
# and cancel the existing timeouts
actions =
Expand All @@ -118,21 +104,14 @@ defmodule Archethic.P2P.Client.Connection do
data = %{
ip: ip,
port: port,
transport: transport,
node_public_key: node_public_key
transport: transport
}
) do
case transport.handle_connect(ip, port) do
{:ok, socket} ->
{:next_state, {:connected, socket}, data}

{:error, reason} ->
Logger.debug(
"Error during node connection to #{:inet.ntoa(ip)}:#{port} - #{reason} ",
node: Base.encode16(node_public_key)
)

MemTable.decrease_node_availability(node_public_key)
{:error, _} ->
actions = [{{:timeout, :reconnect}, 500, nil}]
{:keep_state_and_data, actions}
end
Expand Down
11 changes: 0 additions & 11 deletions lib/archethic/p2p/client/default_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ defmodule Archethic.P2P.Client.DefaultImpl do
alias Archethic.P2P.Client.Connection
alias Archethic.P2P.Client.ConnectionSupervisor
alias Archethic.P2P.Client.Transport.TCPImpl
alias Archethic.P2P.MemTable
alias Archethic.P2P.Message
alias Archethic.P2P.Node

Expand Down Expand Up @@ -62,20 +61,10 @@ defmodule Archethic.P2P.Client.DefaultImpl do
node: Base.encode16(node_public_key)
)

MemTable.decrease_node_availability(node_public_key)
BeaconUpdate.unsubscribe(node_public_key)

{:error, reason}
end
end
end

@doc """
When receiving a message from a node, we set it connected
"""
@impl Client
@spec set_connected(Crypto.key()) :: :ok
def set_connected(node_public_key) do
Connection.set_connected(node_public_key)
end
end
10 changes: 9 additions & 1 deletion lib/archethic/p2p/listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ defmodule Archethic.P2P.Listener do
transport = Keyword.get(opts, :transport)
port = Keyword.get(opts, :port)

{:ok, {transport, port}}
end

def listen() do
GenServer.cast(__MODULE__, :start_listener)
end

def handle_cast(:start_listener, {transport, port}) do
ranch_transport =
case transport do
:tcp ->
Expand All @@ -34,7 +42,7 @@ defmodule Archethic.P2P.Listener do
{:ok, listener_pid} ->
Logger.info("P2P #{transport} Endpoint running on port #{port}")

{:ok, %{listener_pid: listener_pid}}
{:noreply, %{listener_pid: listener_pid}}

{:error, :eaddrinuse} ->
Logger.error(
Expand Down
38 changes: 16 additions & 22 deletions lib/archethic/p2p/listener_protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,17 @@ defmodule Archethic.P2P.ListenerProtocol do
end

def init({ref, transport, opts}) do
if :persistent_term.get(:archethic_up, nil) do
{:ok, socket} = :ranch.handshake(ref)
:ok = transport.setopts(socket, opts)
{:ok, socket} = :ranch.handshake(ref)
:ok = transport.setopts(socket, opts)

{:ok, {ip, port}} = :inet.peername(socket)
{:ok, {ip, port}} = :inet.peername(socket)

:gen_server.enter_loop(__MODULE__, [], %{
socket: socket,
transport: transport,
ip: ip,
port: port
})
else
# node is Bootstrapping, reject any incoming request messages
{:stop, :node_bootstrapping}
end
:gen_server.enter_loop(__MODULE__, [], %{
socket: socket,
transport: transport,
ip: ip,
port: port
})
end

def handle_info(
Expand All @@ -41,16 +36,15 @@ defmodule Archethic.P2P.ListenerProtocol do
) do
:inet.setopts(socket, active: :once)

%Archethic.P2P.MessageEnvelop{
message_id: message_id,
message: message,
sender_public_key: sender_public_key
} = Archethic.P2P.MessageEnvelop.decode(msg)
Task.Supervisor.start_child(TaskSupervisor, fn ->
%Archethic.P2P.MessageEnvelop{
message_id: message_id,
message: message,
sender_public_key: sender_public_key
} = Archethic.P2P.MessageEnvelop.decode(msg)

Archethic.P2P.MemTable.increase_node_availability(sender_public_key)
Archethic.P2P.Client.set_connected(sender_public_key)
Archethic.P2P.MemTable.increase_node_availability(sender_public_key)

Task.Supervisor.start_child(TaskSupervisor, fn ->
response = Archethic.P2P.Message.process(message)

encoded_response =
Expand Down

0 comments on commit 46f1122

Please sign in to comment.