Skip to content

Commit

Permalink
Ensure destination of elected message
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelmanzanera committed Nov 8, 2022
1 parent 1551240 commit 46207b9
Show file tree
Hide file tree
Showing 9 changed files with 289 additions and 210 deletions.
74 changes: 66 additions & 8 deletions lib/archethic/beacon_chain/replication_attestation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ defmodule Archethic.BeaconChain.ReplicationAttestation do
alias Archethic.Election

alias Archethic.P2P
alias Archethic.P2P.Message.GetTransactionSummary
alias Archethic.P2P.Message.NotFound
alias Archethic.P2P.Node

alias Archethic.TransactionChain.TransactionSummary

require Logger

defstruct [:transaction_summary, confirmations: [], version: 1]

@type t :: %__MODULE__{
Expand Down Expand Up @@ -182,20 +186,74 @@ defmodule Archethic.BeaconChain.ReplicationAttestation do

storage_nodes = Election.chain_storage_nodes_with_type(tx_address, tx_type, authorized_nodes)

if valid_confirmations?(confirmations, tx_summary_payload, storage_nodes) do
with :ok <- validate_confirmations(confirmations, tx_summary_payload, storage_nodes) do
check_transaction_summary(storage_nodes, tx_summary)
end
end

defp validate_confirmations([], _, _), do: {:error, :invalid_confirmations_signatures}

defp validate_confirmations(confirmations, tx_summary_payload, storage_nodes) do
valid_confirmations? =
Enum.all?(confirmations, fn {node_index, signature} ->
%Node{first_public_key: node_public_key} = Enum.at(storage_nodes, node_index)
Crypto.verify?(signature, tx_summary_payload, node_public_key)
end)

if valid_confirmations? do
:ok
else
{:error, :invalid_confirmations_signatures}
end
end

defp valid_confirmations?([], _, _), do: false
defp check_transaction_summary(nodes, expected_summary, timeout \\ 500)

defp valid_confirmations?(confirmations, tx_summary_payload, storage_nodes) do
confirmations
|> Enum.all?(fn {node_index, signature} ->
%Node{first_public_key: node_public_key} = Enum.at(storage_nodes, node_index)
Crypto.verify?(signature, tx_summary_payload, node_public_key)
end)
defp check_transaction_summary([], _, _), do: {:error, :network_issue}

defp check_transaction_summary(
nodes,
expected_summary = %TransactionSummary{
address: address,
type: type
},
_timeout
) do
conflict_resolver = fn results ->
case Enum.find(results, &match?(%TransactionSummary{address: ^address, type: ^type}, &1)) do
nil ->
%NotFound{}

tx_summary ->
tx_summary
end
end

case P2P.quorum_read(
nodes,
%GetTransactionSummary{address: address},
conflict_resolver
) do
{:ok, ^expected_summary} ->
:ok

{:ok, recv = %TransactionSummary{}} ->
Logger.warning(
"Transaction summary received is different #{inspect(recv)} - expect #{inspect(expected_summary)}",
transaction_address: Base.encode16(address),
transaction_type: type
)

{:ok, %NotFound{}} ->
Logger.warning("Transaction summary was not found",
transaction_address: Base.encode16(address),
transaction_type: type
)

{:error, :invalid_summary}

{:error, :network_issue} ->
{:error, :network_issue}
end
end
end
94 changes: 7 additions & 87 deletions lib/archethic/beacon_chain/slot/validation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,7 @@ defmodule Archethic.BeaconChain.Slot.Validation do
alias Archethic.BeaconChain.Slot
alias Archethic.BeaconChain.Slot.EndOfNodeSync

alias Archethic.Crypto

alias Archethic.Election

alias Archethic.P2P
alias Archethic.P2P.Message.GetTransactionSummary
alias Archethic.P2P.Message.NotFound
alias Archethic.P2P.Node

alias Archethic.TaskSupervisor
Expand All @@ -37,21 +31,16 @@ defmodule Archethic.BeaconChain.Slot.Validation do

defp valid_transaction_attestation(
attestation = %ReplicationAttestation{
transaction_summary:
tx_summary = %TransactionSummary{
address: address,
timestamp: timestamp,
type: tx_type
}
transaction_summary: %TransactionSummary{
address: address,
type: tx_type
}
}
) do
storage_nodes = transaction_storage_nodes(address, timestamp)
case ReplicationAttestation.validate(attestation) do
:ok ->
true

with :ok <-
ReplicationAttestation.validate(attestation),
:ok <- check_transaction_summary(storage_nodes, tx_summary) do
true
else
{:error, reason} ->
Logger.debug("Invalid attestation #{inspect(reason)} - #{inspect(attestation)}",
transaction_address: Base.encode16(address),
Expand All @@ -62,75 +51,6 @@ defmodule Archethic.BeaconChain.Slot.Validation do
end
end

defp check_transaction_summary(nodes, expected_summary, timeout \\ 500)

defp check_transaction_summary([], _, _), do: {:error, :network_issue}

defp check_transaction_summary(
nodes,
expected_summary = %TransactionSummary{
address: address,
type: type
},
_timeout
) do
conflict_resolver = fn results ->
case Enum.find(results, &match?(%TransactionSummary{address: ^address, type: ^type}, &1)) do
nil ->
%NotFound{}

tx_summary ->
tx_summary
end
end

case P2P.quorum_read(
nodes,
%GetTransactionSummary{address: address},
conflict_resolver
) do
{:ok, ^expected_summary} ->
:ok

{:ok, recv = %TransactionSummary{}} ->
Logger.warning(
"Transaction summary received is different #{inspect(recv)} - expect #{inspect(expected_summary)}",
transaction_address: Base.encode16(address),
transaction_type: type
)

{:ok, %NotFound{}} ->
Logger.warning("Transaction summary was not found",
transaction_address: Base.encode16(address),
transaction_type: type
)

{:error, :invalid_summary}

{:error, :network_issue} ->
{:error, :network_issue}
end
end

defp transaction_storage_nodes(address, timestamp) do
authorized_nodes =
case P2P.authorized_nodes(timestamp) do
[] ->
# Should only happen during bootstrap
P2P.authorized_nodes()

nodes ->
Enum.filter(nodes, & &1.available?)
end

address
# We are targeting the authorized nodes from the transaction validation to increase consistency and some guarantee
|> Election.chain_storage_nodes(authorized_nodes)
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)
|> P2P.unprioritize_node(Crypto.first_node_public_key())
end

@doc """
Validate the end of node synchronization to ensure the list of nodes exists
"""
Expand Down
7 changes: 6 additions & 1 deletion lib/archethic/beacon_chain/subset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,13 @@ defmodule Archethic.BeaconChain.Subset do
end

defp broadcast_beacon_slot(subset, next_time, slot) do
node_list =
next_time
|> P2P.authorized_nodes()
|> Enum.filter(& &1.available?)

subset
|> Election.beacon_storage_nodes(next_time, P2P.authorized_and_available_nodes())
|> Election.beacon_storage_nodes(next_time, node_list)
|> P2P.broadcast_message(%NewBeaconSlot{slot: slot})
end

Expand Down
2 changes: 1 addition & 1 deletion lib/archethic/p2p/client/default_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ defmodule Archethic.P2P.Client.DefaultImpl do
) do
if node_public_key == Crypto.first_node_public_key() do
# if the node was itself just process the message
{:ok, Message.process(message)}
{:ok, Message.process(message, node_public_key)}
else
case Connection.send_message(node_public_key, message, timeout) do
{:ok, data} ->
Expand Down
2 changes: 1 addition & 1 deletion lib/archethic/p2p/listener_protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ defmodule Archethic.P2P.ListenerProtocol do
)

start_processing_time = System.monotonic_time()
response = Archethic.P2P.Message.process(message)
response = Archethic.P2P.Message.process(message, sender_public_key)

:telemetry.execute(
[:archethic, :p2p, :handle_message],
Expand Down
Loading

0 comments on commit 46207b9

Please sign in to comment.