diff --git a/lib/archethic/beacon_chain.ex b/lib/archethic/beacon_chain.ex index 3632ed02a..9a057c2eb 100644 --- a/lib/archethic/beacon_chain.ex +++ b/lib/archethic/beacon_chain.ex @@ -14,6 +14,7 @@ defmodule Archethic.BeaconChain do alias __MODULE__.Summary alias __MODULE__.SummaryAggregate alias __MODULE__.SummaryTimer + alias __MODULE__.Update alias Archethic.Crypto @@ -23,7 +24,6 @@ defmodule Archethic.BeaconChain do alias Archethic.P2P.Node alias Archethic.P2P.Message.GetBeaconSummaries alias Archethic.P2P.Message.BeaconSummaryList - alias Archethic.P2P.Message.RegisterBeaconUpdates alias Archethic.TaskSupervisor @@ -255,10 +255,10 @@ defmodule Archethic.BeaconChain do Enum.map(list_subsets(), fn subset -> nodes = Election.beacon_storage_nodes(subset, date, P2P.authorized_and_available_nodes()) - P2P.broadcast_message(nodes, %RegisterBeaconUpdates{ - node_public_key: Crypto.first_node_public_key(), - subset: subset - }) + nodes = + Enum.reject(nodes, fn node -> node.first_public_key == Crypto.first_node_public_key() end) + + Update.subscribe(nodes, subset) end) end @@ -283,7 +283,7 @@ defmodule Archethic.BeaconChain do | /\ | /\ | [D1] [D2] [D3] Partition by date | | | - [ ] [ ] [ ] Aggregate and consolidate summaries + [ ] [ ] [ ] Aggregate and consolidate summaries \ | / \ | / \ | / @@ -299,7 +299,7 @@ defmodule Archethic.BeaconChain do list_subsets() |> Flow.from_enumerable() |> Flow.flat_map(fn subset -> - # Foreach subset and date we compute concurrently the node election + # Foreach subset and date we compute concurrently the node election dates |> Stream.map(&get_summary_address_by_node(&1, subset, authorized_nodes)) |> Enum.flat_map(& &1) diff --git a/lib/archethic/beacon_chain/slot.ex b/lib/archethic/beacon_chain/slot.ex index 5de4b422a..290eb0af4 100644 --- a/lib/archethic/beacon_chain/slot.ex +++ b/lib/archethic/beacon_chain/slot.ex @@ -52,6 +52,8 @@ defmodule Archethic.BeaconChain.Slot do If the transaction summary already exists, it will append the confirmation node with the node public key and its signature. + Return true if transaction summary is new, false if transaction summary is updated + ## Examples Add the first confirmation @@ -74,7 +76,7 @@ defmodule Archethic.BeaconChain.Slot do ...> 32, 180, 47, 189, 143, 239, 156, 56, 234, 236, 128, 17, 79, 236, 211, 124, ...> 158, 142, 23, 151, 43, 50, 153, 52, 195, 144, 226, 247, 65>>}] ...> }) - %Slot{ + {true, %Slot{ transaction_attestations: [ %ReplicationAttestation{ transaction_summary: %TransactionSummary{ @@ -99,7 +101,7 @@ defmodule Archethic.BeaconChain.Slot do ] } ] - } + }} Append confirmation @@ -140,7 +142,7 @@ defmodule Archethic.BeaconChain.Slot do ...> 239, 151, 241, 35, 93, 254, 65, 201, 152, 57, 187, 225, 86, 235, 56, 206, 134, ...> 141, 174, 141, 29, 28, 173, 17, 4, 78, 129, 33, 68, 4>>}], ...> }) - %Slot{ + {false, %Slot{ transaction_attestations: [ %ReplicationAttestation{ transaction_summary: %TransactionSummary{ @@ -172,7 +174,7 @@ defmodule Archethic.BeaconChain.Slot do ] } ] - } + }} Append transaction attestations @@ -207,7 +209,7 @@ defmodule Archethic.BeaconChain.Slot do ...> 239, 151, 241, 35, 93, 254, 65, 201, 152, 57, 187, 225, 86, 235, 56, 206, 134, ...> 141, 174, 141, 29, 28, 173, 17, 4, 78, 129, 33, 68, 4>>}], ...> }) - %Slot{ + {true, %Slot{ transaction_attestations: [ %ReplicationAttestation{ transaction_summary: %TransactionSummary{ @@ -243,14 +245,14 @@ defmodule Archethic.BeaconChain.Slot do ] } ] - } + }} """ @spec add_transaction_attestation( __MODULE__.t(), ReplicationAttestation.t() ) :: - __MODULE__.t() + {boolean(), __MODULE__.t()} def add_transaction_attestation( slot = %__MODULE__{transaction_attestations: transaction_attestations}, attestation = %ReplicationAttestation{ @@ -263,10 +265,10 @@ defmodule Archethic.BeaconChain.Slot do &(&1.transaction_summary.address == tx_address) ) do nil -> - Map.update!(slot, :transaction_attestations, &[attestation | &1]) + {true, Map.update!(slot, :transaction_attestations, &[attestation | &1])} index -> - add_transaction_attestation_confirmations(slot, index, confirmations) + {false, add_transaction_attestation_confirmations(slot, index, confirmations)} end end diff --git a/lib/archethic/beacon_chain/subset.ex b/lib/archethic/beacon_chain/subset.ex index c4c7e79ed..258e0fcad 100644 --- a/lib/archethic/beacon_chain/subset.ex +++ b/lib/archethic/beacon_chain/subset.ex @@ -153,18 +153,25 @@ defmodule Archethic.BeaconChain.Subset do ) do with ^subset <- BeaconChain.subset_from_address(address), ^slot_time <- SlotTimer.next_slot(timestamp) do - new_slot = + {new_tx?, new_slot} = Slot.add_transaction_attestation( current_slot, attestation ) - Logger.info( - "Transaction #{type}@#{Base.encode16(address)} added to the beacon chain (in #{DateTime.to_string(slot_time)} slot)", - beacon_subset: Base.encode16(subset) - ) + if new_tx? do + Logger.info( + "Transaction #{type}@#{Base.encode16(address)} added to the beacon chain (in #{DateTime.to_string(slot_time)} slot)", + beacon_subset: Base.encode16(subset) + ) - notify_subscribed_nodes(subscribed_nodes, attestation) + notify_subscribed_nodes(subscribed_nodes, attestation) + else + Logger.info( + "New confirmation for transaction #{type}@#{Base.encode16(address)} added to the beacon chain (in #{DateTime.to_string(slot_time)} slot)", + beacon_subset: Base.encode16(subset) + ) + end # Request the P2P view sampling if the not perfomed from the last 3 seconds if update_p2p_view?(state) do @@ -194,11 +201,23 @@ defmodule Archethic.BeaconChain.Subset do end defp notify_subscribed_nodes(nodes, %ReplicationAttestation{ - transaction_summary: tx_summary + transaction_summary: + tx_summary = %TransactionSummary{timestamp: timestamp, address: address} }) do + PubSub.notify_transaction_attestation(tx_summary) + + # Do not notify beacon storage nodes as they are already aware of the transaction + beacon_storage_nodes = + Election.beacon_storage_nodes( + BeaconChain.subset_from_address(address), + BeaconChain.next_slot(timestamp), + P2P.authorized_nodes(timestamp) + ) + |> Enum.map(& &1.first_public_key) + nodes |> P2P.get_nodes_info() - |> Enum.reject(&(&1.first_public_key == Crypto.first_node_public_key())) + |> Enum.reject(&Enum.member?(beacon_storage_nodes, &1.first_public_key)) |> P2P.broadcast_message(tx_summary) end diff --git a/lib/archethic/beacon_chain/supervisor.ex b/lib/archethic/beacon_chain/supervisor.ex index 4f4058325..860f5ebb2 100644 --- a/lib/archethic/beacon_chain/supervisor.ex +++ b/lib/archethic/beacon_chain/supervisor.ex @@ -6,6 +6,7 @@ defmodule Archethic.BeaconChain.Supervisor do alias Archethic.BeaconChain.SlotTimer alias Archethic.BeaconChain.SummaryTimer alias Archethic.BeaconChain.SubsetSupervisor + alias Archethic.BeaconChain.Update alias Archethic.Utils @@ -20,7 +21,7 @@ defmodule Archethic.BeaconChain.Supervisor do {SummaryTimer, Application.get_env(:archethic, SummaryTimer), []} ]) - children = schedulers ++ [SubsetSupervisor] + children = schedulers ++ [SubsetSupervisor, Update] Supervisor.init(children, strategy: :one_for_one) end diff --git a/lib/archethic/beacon_chain/update.ex b/lib/archethic/beacon_chain/update.ex new file mode 100644 index 000000000..1b0569b2b --- /dev/null +++ b/lib/archethic/beacon_chain/update.ex @@ -0,0 +1,77 @@ +defmodule Archethic.BeaconChain.Update do + @moduledoc false + + use GenServer + + alias Archethic.Crypto + + alias Archethic.P2P + alias Archethic.P2P.Node + alias Archethic.P2P.Message.RegisterBeaconUpdates + + alias Archethic.TaskSupervisor + + def start_link(args \\ [], opts \\ [name: __MODULE__]) do + GenServer.start_link(__MODULE__, args, opts) + end + + @doc """ + Subscribe for Beacon update to a node if not already subscribed + """ + @spec subscribe(list(Node.t()), binary()) :: :ok + def subscribe(nodes, subset) do + GenServer.cast(__MODULE__, {:subscribe, nodes, subset}) + end + + @doc """ + Unsubscribe for Beacon update to a node + """ + @spec unsubscribe(binary()) :: :ok + def unsubscribe(node_public_key) do + GenServer.cast(__MODULE__, {:unsubscribe, node_public_key}) + end + + def init(_args) do + {:ok, Map.new()} + end + + def handle_cast({:subscribe, nodes, subset}, state) do + nodes_to_subscribe = + Enum.reject(nodes, fn %Node{first_public_key: public_key} -> + Map.get(state, public_key, []) |> Enum.member?(subset) + end) + + message = %RegisterBeaconUpdates{ + node_public_key: Crypto.first_node_public_key(), + subset: subset + } + + new_state = + if Enum.empty?(nodes_to_subscribe) do + state + else + Task.Supervisor.async_stream( + TaskSupervisor, + nodes_to_subscribe, + fn node -> + {P2P.send_message(node, message), node.first_public_key} + end, + ordered: false, + on_timeout: :kill_task + ) + |> Stream.filter(&match?({:ok, {{:ok, _}, _}}, &1)) + |> Stream.map(fn {:ok, {{:ok, _response}, public_key}} -> public_key end) + |> Enum.reduce(state, fn public_key, acc -> + Map.update(acc, public_key, [subset], fn subsets -> [subset | subsets] end) + end) + end + + {:noreply, new_state} + end + + def handle_cast({:unsubscribe, node_public_key}, state) do + new_state = Map.delete(state, node_public_key) + + {:noreply, new_state} + end +end diff --git a/lib/archethic/p2p/client/default_impl.ex b/lib/archethic/p2p/client/default_impl.ex index 94409a91a..efcfa1fe6 100644 --- a/lib/archethic/p2p/client/default_impl.ex +++ b/lib/archethic/p2p/client/default_impl.ex @@ -12,6 +12,8 @@ defmodule Archethic.P2P.Client.DefaultImpl do alias Archethic.P2P.Message alias Archethic.P2P.Node + alias Archethic.BeaconChain.Update, as: BeaconUpdate + require Logger @behaviour Client @@ -61,6 +63,7 @@ defmodule Archethic.P2P.Client.DefaultImpl do ) MemTable.decrease_node_availability(node_public_key) + BeaconUpdate.unsubscribe(node_public_key) {:error, reason} end diff --git a/lib/archethic_web/live/chains/beacon_live.ex b/lib/archethic_web/live/chains/beacon_live.ex index 123af531e..38d692cd9 100644 --- a/lib/archethic_web/live/chains/beacon_live.ex +++ b/lib/archethic_web/live/chains/beacon_live.ex @@ -4,7 +4,6 @@ defmodule ArchethicWeb.BeaconChainLive do alias Archethic.BeaconChain alias Archethic.BeaconChain.Slot - alias Archethic.BeaconChain.SummaryTimer alias Archethic.Crypto @@ -218,7 +217,7 @@ defmodule ArchethicWeb.BeaconChainLive do |> Enum.at(0) enrollment_date - |> SummaryTimer.previous_summaries() + |> BeaconChain.previous_summary_dates() |> Enum.sort({:desc, DateTime}) end diff --git a/test/archethic/beacon_chain/update_test.exs b/test/archethic/beacon_chain/update_test.exs new file mode 100644 index 000000000..32bccddee --- /dev/null +++ b/test/archethic/beacon_chain/update_test.exs @@ -0,0 +1,90 @@ +defmodule Archethic.BeaconChain.UpdateTest do + use ArchethicCase + + alias Archethic.BeaconChain.Update + + alias Archethic.P2P.Node + + import Mox + import GenServer + + test "should create new entries in map" do + {:ok, pid} = Update.start_link([], []) + me = self() + + MockClient + |> stub(:send_message, fn _, _, _ -> + send(me, :message_sent) + {:ok, nil} + end) + + nodes = [ + %Node{first_public_key: "123"}, + %Node{first_public_key: "456"} + ] + + GenServer.cast(pid, {:subscribe, nodes, <<200>>}) + assert_receive :message_sent + assert %{"123" => [<<200>>], "456" => [<<200>>]} = :sys.get_state(pid) + end + + test "should not duplicate node key" do + {:ok, pid} = Update.start_link([], []) + me = self() + + MockClient + |> stub(:send_message, fn _, _, _ -> + send(me, :message_sent) + {:ok, nil} + end) + + nodes = [%Node{first_public_key: "123"}] + + GenServer.cast(pid, {:subscribe, nodes, <<200>>}) + assert_receive :message_sent + assert %{"123" => [<<200>>]} = :sys.get_state(pid) + + GenServer.cast(pid, {:subscribe, nodes, <<200>>}) + refute_receive :message_sent + assert %{"123" => [<<200>>]} = :sys.get_state(pid) + end + + test "should add subset to node" do + {:ok, pid} = Update.start_link([], []) + me = self() + + MockClient + |> stub(:send_message, fn _, _, _ -> + send(me, :message_sent) + {:ok, nil} + end) + + nodes = [%Node{first_public_key: "123"}] + + GenServer.cast(pid, {:subscribe, nodes, <<200>>}) + assert_receive :message_sent + assert %{"123" => [<<200>>]} = :sys.get_state(pid) + + GenServer.cast(pid, {:subscribe, nodes, <<100>>}) + assert_receive :message_sent + assert %{"123" => [<<100>>, <<200>>]} = :sys.get_state(pid) + end + + test "should delete node from state" do + {:ok, pid} = Update.start_link([], []) + + MockClient + |> stub(:send_message, fn _, _, _ -> {:ok, nil} end) + + nodes = [ + %Node{first_public_key: "123"}, + %Node{first_public_key: "456"} + ] + + GenServer.cast(pid, {:subscribe, nodes, <<200>>}) + assert %{"123" => [<<200>>], "456" => [<<200>>]} = :sys.get_state(pid) + + GenServer.cast(pid, {:unsubscribe, "123"}) + assert %{"456" => [<<200>>]} = :sys.get_state(pid) + end +end