Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure destination of elected message #674

Merged
merged 1 commit into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 86 additions & 18 deletions lib/archethic/beacon_chain/replication_attestation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ defmodule Archethic.BeaconChain.ReplicationAttestation do
alias Archethic.Election

alias Archethic.P2P
alias Archethic.P2P.Message.GetTransactionSummary
alias Archethic.P2P.Message.NotFound
alias Archethic.P2P.Node

alias Archethic.TransactionChain.TransactionSummary

require Logger

defstruct [:transaction_summary, confirmations: [], version: 1]

@type t :: %__MODULE__{
Expand Down Expand Up @@ -156,18 +160,21 @@ defmodule Archethic.BeaconChain.ReplicationAttestation do
@doc """
Determine if the attestation is cryptographically valid
"""
@spec validate(t()) ::
@spec validate(attestation :: t(), check_tx_summary_consistency? :: boolean()) ::
:ok
| {:error, :invalid_confirmations_signatures}
def validate(%__MODULE__{
transaction_summary:
tx_summary = %TransactionSummary{
address: tx_address,
type: tx_type,
timestamp: timestamp
},
confirmations: confirmations
}) do
def validate(
%__MODULE__{
transaction_summary:
tx_summary = %TransactionSummary{
address: tx_address,
type: tx_type,
timestamp: timestamp
},
confirmations: confirmations
},
check_summary_consistency? \\ true
) do
tx_summary_payload = TransactionSummary.serialize(tx_summary)

authorized_nodes =
Expand All @@ -182,20 +189,81 @@ defmodule Archethic.BeaconChain.ReplicationAttestation do

storage_nodes = Election.chain_storage_nodes_with_type(tx_address, tx_type, authorized_nodes)

if valid_confirmations?(confirmations, tx_summary_payload, storage_nodes) do
with true <- check_summary_consistency?,
:ok <- check_transaction_summary(storage_nodes, tx_summary) do
validate_confirmations(confirmations, tx_summary_payload, storage_nodes)
else
false ->
validate_confirmations(confirmations, tx_summary_payload, storage_nodes)

{:error, _} = e ->
e
end
end

defp validate_confirmations([], _, _), do: {:error, :invalid_confirmations_signatures}

defp validate_confirmations(confirmations, tx_summary_payload, storage_nodes) do
valid_confirmations? =
Enum.all?(confirmations, fn {node_index, signature} ->
%Node{first_public_key: node_public_key} = Enum.at(storage_nodes, node_index)
Crypto.verify?(signature, tx_summary_payload, node_public_key)
end)

if valid_confirmations? do
:ok
else
{:error, :invalid_confirmations_signatures}
end
end

defp valid_confirmations?([], _, _), do: false
defp check_transaction_summary(nodes, expected_summary, timeout \\ 500)

defp valid_confirmations?(confirmations, tx_summary_payload, storage_nodes) do
confirmations
|> Enum.all?(fn {node_index, signature} ->
%Node{first_public_key: node_public_key} = Enum.at(storage_nodes, node_index)
Crypto.verify?(signature, tx_summary_payload, node_public_key)
end)
defp check_transaction_summary([], _, _), do: {:error, :network_issue}

defp check_transaction_summary(
nodes,
expected_summary = %TransactionSummary{
address: address,
type: type
},
_timeout
) do
conflict_resolver = fn results ->
case Enum.find(results, &match?(%TransactionSummary{address: ^address, type: ^type}, &1)) do
nil ->
%NotFound{}

tx_summary ->
tx_summary
end
end

case P2P.quorum_read(
nodes,
%GetTransactionSummary{address: address},
conflict_resolver
) do
{:ok, ^expected_summary} ->
:ok

{:ok, recv = %TransactionSummary{}} ->
Logger.warning(
"Transaction summary received is different #{inspect(recv)} - expect #{inspect(expected_summary)}",
transaction_address: Base.encode16(address),
transaction_type: type
)

{:ok, %NotFound{}} ->
Logger.warning("Transaction summary was not found",
transaction_address: Base.encode16(address),
transaction_type: type
)

{:error, :invalid_summary}

{:error, :network_issue} ->
{:error, :network_issue}
end
end
end
94 changes: 7 additions & 87 deletions lib/archethic/beacon_chain/slot/validation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,7 @@ defmodule Archethic.BeaconChain.Slot.Validation do
alias Archethic.BeaconChain.Slot
alias Archethic.BeaconChain.Slot.EndOfNodeSync

alias Archethic.Crypto

alias Archethic.Election

alias Archethic.P2P
alias Archethic.P2P.Message.GetTransactionSummary
alias Archethic.P2P.Message.NotFound
alias Archethic.P2P.Node

alias Archethic.TaskSupervisor
Expand All @@ -37,21 +31,16 @@ defmodule Archethic.BeaconChain.Slot.Validation do

defp valid_transaction_attestation(
attestation = %ReplicationAttestation{
transaction_summary:
tx_summary = %TransactionSummary{
address: address,
timestamp: timestamp,
type: tx_type
}
transaction_summary: %TransactionSummary{
address: address,
type: tx_type
}
}
) do
storage_nodes = transaction_storage_nodes(address, timestamp)
case ReplicationAttestation.validate(attestation) do
:ok ->
true

with :ok <-
ReplicationAttestation.validate(attestation),
:ok <- check_transaction_summary(storage_nodes, tx_summary) do
true
else
{:error, reason} ->
Logger.debug("Invalid attestation #{inspect(reason)} - #{inspect(attestation)}",
transaction_address: Base.encode16(address),
Expand All @@ -62,75 +51,6 @@ defmodule Archethic.BeaconChain.Slot.Validation do
end
end

defp check_transaction_summary(nodes, expected_summary, timeout \\ 500)

defp check_transaction_summary([], _, _), do: {:error, :network_issue}

defp check_transaction_summary(
nodes,
expected_summary = %TransactionSummary{
address: address,
type: type
},
_timeout
) do
conflict_resolver = fn results ->
case Enum.find(results, &match?(%TransactionSummary{address: ^address, type: ^type}, &1)) do
nil ->
%NotFound{}

tx_summary ->
tx_summary
end
end

case P2P.quorum_read(
nodes,
%GetTransactionSummary{address: address},
conflict_resolver
) do
{:ok, ^expected_summary} ->
:ok

{:ok, recv = %TransactionSummary{}} ->
Logger.warning(
"Transaction summary received is different #{inspect(recv)} - expect #{inspect(expected_summary)}",
transaction_address: Base.encode16(address),
transaction_type: type
)

{:ok, %NotFound{}} ->
Logger.warning("Transaction summary was not found",
transaction_address: Base.encode16(address),
transaction_type: type
)

{:error, :invalid_summary}

{:error, :network_issue} ->
{:error, :network_issue}
end
end

defp transaction_storage_nodes(address, timestamp) do
authorized_nodes =
case P2P.authorized_nodes(timestamp) do
[] ->
# Should only happen during bootstrap
P2P.authorized_nodes()

nodes ->
Enum.filter(nodes, & &1.available?)
end

address
# We are targeting the authorized nodes from the transaction validation to increase consistency and some guarantee
|> Election.chain_storage_nodes(authorized_nodes)
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)
|> P2P.unprioritize_node(Crypto.first_node_public_key())
end

@doc """
Validate the end of node synchronization to ensure the list of nodes exists
"""
Expand Down
7 changes: 6 additions & 1 deletion lib/archethic/beacon_chain/subset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,13 @@ defmodule Archethic.BeaconChain.Subset do
end

defp broadcast_beacon_slot(subset, next_time, slot) do
node_list =
next_time
|> P2P.authorized_nodes()
|> Enum.filter(& &1.available?)

subset
|> Election.beacon_storage_nodes(next_time, P2P.authorized_and_available_nodes())
|> Election.beacon_storage_nodes(next_time, node_list)
|> P2P.broadcast_message(%NewBeaconSlot{slot: slot})
end

Expand Down
2 changes: 1 addition & 1 deletion lib/archethic/beacon_chain/summary_aggregate.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ defmodule Archethic.BeaconChain.SummaryAggregate do
valid_attestations? =
if check_attestation? do
Enum.all?(transaction_attestations, fn attestation ->
ReplicationAttestation.validate(attestation) == :ok
ReplicationAttestation.validate(attestation, false) == :ok
end)
else
true
Expand Down
2 changes: 1 addition & 1 deletion lib/archethic/p2p/client/default_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ defmodule Archethic.P2P.Client.DefaultImpl do
) do
if node_public_key == Crypto.first_node_public_key() do
# if the node was itself just process the message
{:ok, Message.process(message)}
{:ok, Message.process(message, node_public_key)}
else
case Connection.send_message(node_public_key, message, timeout) do
{:ok, data} ->
Expand Down
2 changes: 1 addition & 1 deletion lib/archethic/p2p/listener_protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ defmodule Archethic.P2P.ListenerProtocol do
)

start_processing_time = System.monotonic_time()
response = Archethic.P2P.Message.process(message)
response = Archethic.P2P.Message.process(message, sender_public_key)

:telemetry.execute(
[:archethic, :p2p, :handle_message],
Expand Down
Loading