Skip to content

Commit

Permalink
Better p2p concurrency (#391)
Browse files Browse the repository at this point in the history
* Add Broadway pipeline globally for system-wide back pressure
* Async loading of beacon transaction
  • Loading branch information
Samuel committed Jun 24, 2022
1 parent 3a2c9a5 commit 8670586
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 103 deletions.
66 changes: 49 additions & 17 deletions lib/archethic/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ defmodule Archethic.BeaconChain do
alias Archethic.P2P.Node
alias Archethic.P2P.Message.RegisterBeaconUpdates

alias Archethic.TaskSupervisor

alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.Transaction.ValidationStamp
alias Archethic.TransactionChain.TransactionData

alias Archethic.Utils
Expand Down Expand Up @@ -106,37 +109,66 @@ defmodule Archethic.BeaconChain do
@spec load_transaction(Transaction.t()) :: :ok | :error
def load_transaction(
tx = %Transaction{
address: tx_address,
type: :beacon,
data: %TransactionData{content: content}
data: %TransactionData{content: content},
validation_stamp: %ValidationStamp{
timestamp: timestamp
}
}
) do
with {%Slot{subset: subset, slot_time: slot_time} = slot, _} <- Slot.deserialize(content),
:ok <- validate_slot(tx, slot),
genesis_address <-
Crypto.derive_beacon_chain_address(subset, previous_summary_time(slot_time)),
:ok <- TransactionChain.write_transaction_at(tx, genesis_address) do
Logger.debug("New beacon transaction loaded - #{inspect(slot)}",
beacon_subset: Base.encode16(subset)
)
:ok <- validate_beacon_address(subset, slot_time, tx_address),
slot_time <- SlotTimer.previous_slot(timestamp) do
Task.Supervisor.start_child(TaskSupervisor, fn ->
case validate_slot(slot) do
:ok ->
genesis_address =
Crypto.derive_beacon_chain_address(subset, previous_summary_time(slot_time))

:ok = TransactionChain.write_transaction_at(tx, genesis_address)

Logger.debug("New beacon transaction loaded - #{inspect(slot)}",
beacon_subset: Base.encode16(subset)
)

SummaryCache.add_slot(subset, slot)

SummaryCache.add_slot(subset, slot)
{:error, reason} ->
Logger.error("Invalid beacon slot - #{inspect(reason)}")
end
end)

:ok
else
{:error, _} = e ->
Logger.error("Invalid beacon slot #{inspect(e)}")
{:error, :invalid_address} ->
Logger.error("Invalid beacon slot - Invalid tx address")
:error

%DateTime{} ->
Logger.error("Invalid beacon slot - Invalid slot time")
:error

_ ->
Logger.error("Invalid beacon slot - Unexpected serialized data")
:error
end
end

def load_transaction(_), do: :ok

defp validate_slot(
%Transaction{address: address},
slot = %Slot{subset: subset, slot_time: slot_time}
) do
cond do
address != Crypto.derive_beacon_chain_address(subset, slot_time) ->
defp validate_beacon_address(subset, slot_time, address) do
case Crypto.derive_beacon_chain_address(subset, slot_time) do
^address ->
:ok

_ ->
{:error, :invalid_address}
end
end

defp validate_slot(slot = %Slot{}) do
cond do
!SlotValidation.valid_transaction_attestations?(slot) ->
{:error, :invalid_transaction_attestations}

Expand Down
5 changes: 1 addition & 4 deletions lib/archethic/p2p/listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ defmodule Archethic.P2P.Listener do
use GenServer

alias Archethic.P2P.ListenerProtocol
alias Archethic.P2P.ListenerProtocol.Supervisor, as: ListenerProtocolSupervisor

require Logger

Expand All @@ -25,8 +24,6 @@ defmodule Archethic.P2P.Listener do
transport
end

{:ok, listener_protocol_sup} = ListenerProtocolSupervisor.start_link()

{:ok, listener_pid} =
:ranch.start_listener(
:archethic_p2p,
Expand All @@ -38,6 +35,6 @@ defmodule Archethic.P2P.Listener do

Logger.info("P2P #{transport} Endpoint running on port #{port}")

{:ok, %{listener_pid: listener_pid, listener_protocol_sup_pid: listener_protocol_sup}}
{:ok, %{listener_pid: listener_pid}}
end
end
20 changes: 2 additions & 18 deletions lib/archethic/p2p/listener_protocol.ex
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
defmodule Archethic.P2P.ListenerProtocol do
@moduledoc false

alias __MODULE__.BroadwayPipeline
alias __MODULE__.MessageProducer
alias __MODULE__.MessageProducerRegistry

require Logger

Expand All @@ -20,34 +18,20 @@ defmodule Archethic.P2P.ListenerProtocol do

{:ok, {ip, port}} = :inet.peername(socket)

{:ok, _pid} =
BroadwayPipeline.start_link(
socket: socket,
transport: transport,
conn_pid: self(),
ip: ip,
port: port
)

Process.sleep(100)

[{producer_pid, _}] = Registry.lookup(MessageProducerRegistry, socket)

:gen_server.enter_loop(__MODULE__, [], %{
socket: socket,
transport: transport,
producer_pid: producer_pid,
ip: ip,
port: port
})
end

def handle_info(
{_transport, socket, msg},
state = %{producer_pid: producer_pid}
state = %{transport: transport}
) do
:inet.setopts(socket, active: :once)
MessageProducer.new_message(producer_pid, msg)
MessageProducer.new_message({socket, transport, msg})
{:noreply, state}
end

Expand Down
43 changes: 9 additions & 34 deletions lib/archethic/p2p/listener_protocol/broadway_pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ defmodule Archethic.P2P.ListenerProtocol.BroadwayPipeline do

alias Archethic.Crypto

alias Archethic.P2P.ListenerProtocol.BroadwayPipelineRegistry
alias Archethic.P2P.ListenerProtocol.MessageProducer
alias Archethic.P2P.MemTable
alias Archethic.P2P.Message
Expand All @@ -15,54 +14,40 @@ defmodule Archethic.P2P.ListenerProtocol.BroadwayPipeline do

use Broadway

def start_link(arg) do
socket = Keyword.get(arg, :socket)
transport = Keyword.get(arg, :transport)
ip = Keyword.get(arg, :ip)
port = Keyword.get(arg, :port)
conn_pid = Keyword.get(arg, :conn_pid)

def start_link(arg \\ []) do
Broadway.start_link(__MODULE__,
name: {:via, Registry, {BroadwayPipelineRegistry, {ip, port, conn_pid}}},
context: %{
socket: socket,
transport: transport
},
name: __MODULE__,
producer: [
module: {MessageProducer, arg},
transformer: {__MODULE__, :transform, []},
concurrency: 1
],
processors: [
default: [concurrency: 5, max_demand: 1]
default: [concurrency: System.schedulers_online() * 10, max_demand: 1]
]
)
end

def process_name(
{:via, Registry, {BroadwayPipelineRegistry, {ip, port, conn_pid}}},
base_name
) do
pid_string = conn_pid |> :erlang.pid_to_list() |> :erlang.list_to_binary()
:"#{:inet.ntoa(ip)}:#{port}.#{pid_string}.Broadway.#{base_name}"
end

def transform(event, _) do
%BroadwayMessage{
data: event,
acknowledger: {Broadway.NoopAcknowledger, _ack_ref = nil, _ack_data = nil}
}
end

def handle_message(_, message, %{socket: socket, transport: transport}) do
BroadwayMessage.update_data(message, fn data ->
def handle_message(_, message, _context) do
# start_time = System.monotonic_time(:millisecond)

BroadwayMessage.update_data(message, fn {socket, transport, data} ->
message =
data
|> decode()
|> process()
|> encode()

transport.send(socket, message)
# end_time = System.monotnonic_time(:millisecond)
# Logger.debug("Request processed in #{end_time - start_time} ms")
end)
end

Expand All @@ -73,11 +58,6 @@ defmodule Archethic.P2P.ListenerProtocol.BroadwayPipeline do
sender_public_key: sender_public_key
} = MessageEnvelop.decode(data)

# Logger.debug("Receive message #{Message.name(message)}",
# node: Base.encode16(sender_public_key),
# message_id: message_id
# )

MemTable.increase_node_availability(sender_public_key)
{System.monotonic_time(:millisecond), message_id, message, sender_public_key}
end
Expand All @@ -86,11 +66,6 @@ defmodule Archethic.P2P.ListenerProtocol.BroadwayPipeline do
response = Message.process(message)
# end_time = System.monotonic_time(:millisecond)

# Logger.debug("Message #{Message.name(message)} processed in #{end_time - start_time} ms",
# node: Base.encode16(sender_public_key),
# message_id: message_id
# )

{message_id, response, sender_public_key}
end

Expand Down
20 changes: 11 additions & 9 deletions lib/archethic/p2p/listener_protocol/message_producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,26 @@ defmodule Archethic.P2P.ListenerProtocol.MessageProducer do
@moduledoc false
use GenStage

alias Archethic.P2P.ListenerProtocol.MessageProducerRegistry
alias Archethic.P2P.ListenerProtocol.BroadwayPipeline.Broadway.Producer_0, as: Producer

def start_link(arg) do
GenStage.start_link(__MODULE__, arg)
def start_link(arg, opts \\ [name: __MODULE__]) do
GenStage.start_link(__MODULE__, arg, opts)
end

def new_message(pid, message) do
def new_message(pid \\ Producer, message) do
GenStage.cast(pid, {:new_message, message})
end

def init(arg) do
socket = Keyword.get(arg, :socket)
Registry.register(MessageProducerRegistry, socket, [])
def init(_arg) do
{:producer, %{demand: 0, queue: :queue.new()}}
end

def handle_cast({:new_message, message}, state = %{queue: queue, demand: pending_demand}) do
queue = :queue.in(message, queue)
def handle_cast(
{:new_message, {socket, transport, message}},
state = %{queue: queue, demand: pending_demand}
) do
queue = :queue.in({socket, transport, message}, queue)

dispatch_events(queue, pending_demand, [], state)
end

Expand Down
21 changes: 0 additions & 21 deletions lib/archethic/p2p/listener_protocol/supervisor.ex

This file was deleted.

2 changes: 2 additions & 0 deletions lib/archethic/p2p/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Archethic.P2P.Supervisor do
alias Archethic.P2P.Client.ConnectionRegistry
alias Archethic.P2P.Client.ConnectionSupervisor
alias Archethic.P2P.Listener
alias Archethic.P2P.ListenerProtocol.BroadwayPipeline
alias Archethic.P2P.MemTable
alias Archethic.P2P.MemTableLoader
alias Archethic.P2P.GeoPatch.GeoIP.MaxMindDB
Expand All @@ -30,6 +31,7 @@ defmodule Archethic.P2P.Supervisor do
MaxMindDB,
MemTable,
MemTableLoader,
BroadwayPipeline,
{Listener, Keyword.put(listener_conf, :port, port)},
{BootstrappingSeeds, bootstraping_seeds_conf}
]
Expand Down
6 changes: 6 additions & 0 deletions test/archethic/beacon_chain_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,11 @@ defmodule Archethic.BeaconChainTest do
authorization_date: DateTime.utc_now() |> DateTime.add(-10)
})

me = self()

MockDB
|> expect(:write_transaction_at, fn _, _ ->
send(me, :tx_written)
:ok
end)

Expand All @@ -95,6 +98,9 @@ defmodule Archethic.BeaconChainTest do
}

assert :ok = BeaconChain.load_transaction(tx)
assert_receive :tx_written

Process.sleep(500)

assert [%Slot{subset: <<0>>}] = SummaryCache.pop_slots(<<0>>)
end
Expand Down

0 comments on commit 8670586

Please sign in to comment.