Skip to content

Commit

Permalink
Fixed multiple pub sub causing crash due to new replication attestati…
Browse files Browse the repository at this point in the history
…ons (#432)
  • Loading branch information
imnik11 committed Jul 8, 2022
1 parent f1ab364 commit 7b09579
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 8 deletions.
6 changes: 4 additions & 2 deletions lib/archethic/beacon_chain/subset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,13 @@ defmodule Archethic.BeaconChain.Subset do
end
end

defp notify_subscribed_nodes(nodes, attestation) do
defp notify_subscribed_nodes(nodes, %ReplicationAttestation{
transaction_summary: tx_summary
}) do
nodes
|> P2P.get_nodes_info()
|> Enum.reject(&(&1.first_public_key == Crypto.first_node_public_key()))
|> P2P.broadcast_message(attestation)
|> P2P.broadcast_message(tx_summary)
end

defp handle_slot(
Expand Down
12 changes: 11 additions & 1 deletion lib/archethic/p2p/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1304,7 +1304,17 @@ defmodule Archethic.P2P.Message do
end

def process(%BeaconUpdate{transaction_attestations: transaction_attestations}) do
Enum.each(transaction_attestations, &process/1)
Enum.each(transaction_attestations, fn %ReplicationAttestation{
transaction_summary: tx_summary
} ->
process(tx_summary)
end)

%Ok{}
end

def process(tx_summary = %TransactionSummary{}) do
PubSub.notify_transaction_attestation(tx_summary)

%Ok{}
end
Expand Down
20 changes: 20 additions & 0 deletions lib/archethic/pub_sub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ defmodule Archethic.PubSub do

alias Archethic.TransactionChain.Transaction

alias Archethic.TransactionChain.TransactionSummary

@doc """
Notify the registered processes than a new transaction has been validated
"""
Expand Down Expand Up @@ -90,6 +92,17 @@ defmodule Archethic.PubSub do
)
end

@doc """
Notify a new transaction attestation for beacon explorer
"""
@spec notify_transaction_attestation(TransactionSummary.t()) :: :ok
def notify_transaction_attestation(attestation = %TransactionSummary{}) do
dispatch(
:new_transaction_attestation,
{:new_transaction_attestation, attestation}
)
end

@doc """
Register a process to a new transaction publication by type
"""
Expand Down Expand Up @@ -177,6 +190,13 @@ defmodule Archethic.PubSub do
Registry.register(PubSubRegistry, :new_replication_attestation, [])
end

@doc """
Register to new transaction attestations for beacon explorer
"""
def register_to_new_transaction_attestations do
Registry.register(PubSubRegistry, :new_transaction_attestation, [])
end

defp dispatch(topic, message) do
Registry.dispatch(PubSubRegistry, topic, fn entries ->
for {pid, _} <- entries, do: send(pid, message)
Expand Down
8 changes: 3 additions & 5 deletions lib/archethic_web/live/chains/beacon_live.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ defmodule ArchethicWeb.BeaconChainLive do
use ArchethicWeb, :live_view

alias Archethic.BeaconChain
alias Archethic.BeaconChain.ReplicationAttestation
alias Archethic.BeaconChain.Slot
alias Archethic.BeaconChain.SummaryTimer

Expand All @@ -19,7 +18,7 @@ defmodule ArchethicWeb.BeaconChainLive do
alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.TransactionData

alias Archethic.TransactionChain.TransactionSummary
alias ArchethicWeb.ExplorerView
alias ArchethicWeb.TransactionCache

Expand All @@ -33,8 +32,7 @@ defmodule ArchethicWeb.BeaconChainLive do
if connected?(socket) do
PubSub.register_to_next_summary_time()
PubSub.register_to_current_epoch_of_slot_time()
PubSub.register_to_new_replication_attestations()

PubSub.register_to_new_transaction_attestations()
# register for client to able to get the current added transaction to the beacon pool
BeaconChain.register_to_beacon_pool_updates()
end
Expand Down Expand Up @@ -141,7 +139,7 @@ defmodule ArchethicWeb.BeaconChainLive do
end

def handle_info(
{:new_replication_attestation, %ReplicationAttestation{transaction_summary: tx_summary}},
{:new_transaction_attestation, tx_summary = %TransactionSummary{}},
socket = %{
assigns:
assigns = %{
Expand Down

0 comments on commit 7b09579

Please sign in to comment.