diff --git a/lib/archethic/beacon_chain/replication_attestation.ex b/lib/archethic/beacon_chain/replication_attestation.ex index 0fa6a8e78..bc05a9c95 100644 --- a/lib/archethic/beacon_chain/replication_attestation.ex +++ b/lib/archethic/beacon_chain/replication_attestation.ex @@ -8,10 +8,14 @@ defmodule Archethic.BeaconChain.ReplicationAttestation do alias Archethic.Election alias Archethic.P2P + alias Archethic.P2P.Message.GetTransactionSummary + alias Archethic.P2P.Message.NotFound alias Archethic.P2P.Node alias Archethic.TransactionChain.TransactionSummary + require Logger + defstruct [:transaction_summary, confirmations: [], version: 1] @type t :: %__MODULE__{ @@ -156,18 +160,21 @@ defmodule Archethic.BeaconChain.ReplicationAttestation do @doc """ Determine if the attestation is cryptographically valid """ - @spec validate(t()) :: + @spec validate(attestation :: t(), check_tx_summary_consistency? :: boolean()) :: :ok | {:error, :invalid_confirmations_signatures} - def validate(%__MODULE__{ - transaction_summary: - tx_summary = %TransactionSummary{ - address: tx_address, - type: tx_type, - timestamp: timestamp - }, - confirmations: confirmations - }) do + def validate( + %__MODULE__{ + transaction_summary: + tx_summary = %TransactionSummary{ + address: tx_address, + type: tx_type, + timestamp: timestamp + }, + confirmations: confirmations + }, + check_summary_consistency? \\ true + ) do tx_summary_payload = TransactionSummary.serialize(tx_summary) authorized_nodes = @@ -182,20 +189,81 @@ defmodule Archethic.BeaconChain.ReplicationAttestation do storage_nodes = Election.chain_storage_nodes_with_type(tx_address, tx_type, authorized_nodes) - if valid_confirmations?(confirmations, tx_summary_payload, storage_nodes) do + with true <- check_summary_consistency?, + :ok <- check_transaction_summary(storage_nodes, tx_summary) do + validate_confirmations(confirmations, tx_summary_payload, storage_nodes) + else + false -> + validate_confirmations(confirmations, tx_summary_payload, storage_nodes) + + {:error, _} = e -> + e + end + end + + defp validate_confirmations([], _, _), do: {:error, :invalid_confirmations_signatures} + + defp validate_confirmations(confirmations, tx_summary_payload, storage_nodes) do + valid_confirmations? = + Enum.all?(confirmations, fn {node_index, signature} -> + %Node{first_public_key: node_public_key} = Enum.at(storage_nodes, node_index) + Crypto.verify?(signature, tx_summary_payload, node_public_key) + end) + + if valid_confirmations? do :ok else {:error, :invalid_confirmations_signatures} end end - defp valid_confirmations?([], _, _), do: false + defp check_transaction_summary(nodes, expected_summary, timeout \\ 500) - defp valid_confirmations?(confirmations, tx_summary_payload, storage_nodes) do - confirmations - |> Enum.all?(fn {node_index, signature} -> - %Node{first_public_key: node_public_key} = Enum.at(storage_nodes, node_index) - Crypto.verify?(signature, tx_summary_payload, node_public_key) - end) + defp check_transaction_summary([], _, _), do: {:error, :network_issue} + + defp check_transaction_summary( + nodes, + expected_summary = %TransactionSummary{ + address: address, + type: type + }, + _timeout + ) do + conflict_resolver = fn results -> + case Enum.find(results, &match?(%TransactionSummary{address: ^address, type: ^type}, &1)) do + nil -> + %NotFound{} + + tx_summary -> + tx_summary + end + end + + case P2P.quorum_read( + nodes, + %GetTransactionSummary{address: address}, + conflict_resolver + ) do + {:ok, ^expected_summary} -> + :ok + + {:ok, recv = %TransactionSummary{}} -> + Logger.warning( + "Transaction summary received is different #{inspect(recv)} - expect #{inspect(expected_summary)}", + transaction_address: Base.encode16(address), + transaction_type: type + ) + + {:ok, %NotFound{}} -> + Logger.warning("Transaction summary was not found", + transaction_address: Base.encode16(address), + transaction_type: type + ) + + {:error, :invalid_summary} + + {:error, :network_issue} -> + {:error, :network_issue} + end end end diff --git a/lib/archethic/beacon_chain/slot/validation.ex b/lib/archethic/beacon_chain/slot/validation.ex index 9ee3e6af3..78f9f6c26 100644 --- a/lib/archethic/beacon_chain/slot/validation.ex +++ b/lib/archethic/beacon_chain/slot/validation.ex @@ -5,13 +5,7 @@ defmodule Archethic.BeaconChain.Slot.Validation do alias Archethic.BeaconChain.Slot alias Archethic.BeaconChain.Slot.EndOfNodeSync - alias Archethic.Crypto - - alias Archethic.Election - alias Archethic.P2P - alias Archethic.P2P.Message.GetTransactionSummary - alias Archethic.P2P.Message.NotFound alias Archethic.P2P.Node alias Archethic.TaskSupervisor @@ -37,21 +31,16 @@ defmodule Archethic.BeaconChain.Slot.Validation do defp valid_transaction_attestation( attestation = %ReplicationAttestation{ - transaction_summary: - tx_summary = %TransactionSummary{ - address: address, - timestamp: timestamp, - type: tx_type - } + transaction_summary: %TransactionSummary{ + address: address, + type: tx_type + } } ) do - storage_nodes = transaction_storage_nodes(address, timestamp) + case ReplicationAttestation.validate(attestation) do + :ok -> + true - with :ok <- - ReplicationAttestation.validate(attestation), - :ok <- check_transaction_summary(storage_nodes, tx_summary) do - true - else {:error, reason} -> Logger.debug("Invalid attestation #{inspect(reason)} - #{inspect(attestation)}", transaction_address: Base.encode16(address), @@ -62,75 +51,6 @@ defmodule Archethic.BeaconChain.Slot.Validation do end end - defp check_transaction_summary(nodes, expected_summary, timeout \\ 500) - - defp check_transaction_summary([], _, _), do: {:error, :network_issue} - - defp check_transaction_summary( - nodes, - expected_summary = %TransactionSummary{ - address: address, - type: type - }, - _timeout - ) do - conflict_resolver = fn results -> - case Enum.find(results, &match?(%TransactionSummary{address: ^address, type: ^type}, &1)) do - nil -> - %NotFound{} - - tx_summary -> - tx_summary - end - end - - case P2P.quorum_read( - nodes, - %GetTransactionSummary{address: address}, - conflict_resolver - ) do - {:ok, ^expected_summary} -> - :ok - - {:ok, recv = %TransactionSummary{}} -> - Logger.warning( - "Transaction summary received is different #{inspect(recv)} - expect #{inspect(expected_summary)}", - transaction_address: Base.encode16(address), - transaction_type: type - ) - - {:ok, %NotFound{}} -> - Logger.warning("Transaction summary was not found", - transaction_address: Base.encode16(address), - transaction_type: type - ) - - {:error, :invalid_summary} - - {:error, :network_issue} -> - {:error, :network_issue} - end - end - - defp transaction_storage_nodes(address, timestamp) do - authorized_nodes = - case P2P.authorized_nodes(timestamp) do - [] -> - # Should only happen during bootstrap - P2P.authorized_nodes() - - nodes -> - Enum.filter(nodes, & &1.available?) - end - - address - # We are targeting the authorized nodes from the transaction validation to increase consistency and some guarantee - |> Election.chain_storage_nodes(authorized_nodes) - |> P2P.nearest_nodes() - |> Enum.filter(&Node.locally_available?/1) - |> P2P.unprioritize_node(Crypto.first_node_public_key()) - end - @doc """ Validate the end of node synchronization to ensure the list of nodes exists """ diff --git a/lib/archethic/beacon_chain/subset.ex b/lib/archethic/beacon_chain/subset.ex index f8f360ad1..09fc928a2 100644 --- a/lib/archethic/beacon_chain/subset.ex +++ b/lib/archethic/beacon_chain/subset.ex @@ -323,8 +323,13 @@ defmodule Archethic.BeaconChain.Subset do end defp broadcast_beacon_slot(subset, next_time, slot) do + node_list = + next_time + |> P2P.authorized_nodes() + |> Enum.filter(& &1.available?) + subset - |> Election.beacon_storage_nodes(next_time, P2P.authorized_and_available_nodes()) + |> Election.beacon_storage_nodes(next_time, node_list) |> P2P.broadcast_message(%NewBeaconSlot{slot: slot}) end diff --git a/lib/archethic/beacon_chain/summary_aggregate.ex b/lib/archethic/beacon_chain/summary_aggregate.ex index 37cab0175..8a992ddbe 100644 --- a/lib/archethic/beacon_chain/summary_aggregate.ex +++ b/lib/archethic/beacon_chain/summary_aggregate.ex @@ -47,7 +47,7 @@ defmodule Archethic.BeaconChain.SummaryAggregate do valid_attestations? = if check_attestation? do Enum.all?(transaction_attestations, fn attestation -> - ReplicationAttestation.validate(attestation) == :ok + ReplicationAttestation.validate(attestation, false) == :ok end) else true diff --git a/lib/archethic/p2p/client/default_impl.ex b/lib/archethic/p2p/client/default_impl.ex index 8f9c4d839..ab19bc33d 100644 --- a/lib/archethic/p2p/client/default_impl.ex +++ b/lib/archethic/p2p/client/default_impl.ex @@ -67,7 +67,7 @@ defmodule Archethic.P2P.Client.DefaultImpl do ) do if node_public_key == Crypto.first_node_public_key() do # if the node was itself just process the message - {:ok, Message.process(message)} + {:ok, Message.process(message, node_public_key)} else case Connection.send_message(node_public_key, message, timeout) do {:ok, data} -> diff --git a/lib/archethic/p2p/listener_protocol.ex b/lib/archethic/p2p/listener_protocol.ex index 94c9aa025..350d7692e 100644 --- a/lib/archethic/p2p/listener_protocol.ex +++ b/lib/archethic/p2p/listener_protocol.ex @@ -52,7 +52,7 @@ defmodule Archethic.P2P.ListenerProtocol do ) start_processing_time = System.monotonic_time() - response = Archethic.P2P.Message.process(message) + response = Archethic.P2P.Message.process(message, sender_public_key) :telemetry.execute( [:archethic, :p2p, :handle_message], diff --git a/lib/archethic/p2p/message.ex b/lib/archethic/p2p/message.ex index 7a5147608..5e6fd61c8 100644 --- a/lib/archethic/p2p/message.ex +++ b/lib/archethic/p2p/message.ex @@ -16,6 +16,8 @@ defmodule Archethic.P2P.Message do alias Archethic.Crypto + alias Archethic.Election + alias Archethic.Mining alias Archethic.P2P @@ -86,6 +88,8 @@ defmodule Archethic.P2P.Message do alias Archethic.TransactionChain.Transaction.CrossValidationStamp alias Archethic.TransactionChain.Transaction.ValidationStamp + alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations + alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.VersionedUnspentOutput alias Archethic.TransactionChain.TransactionInput @@ -1280,7 +1284,8 @@ defmodule Archethic.P2P.Message do @doc """ Handle a P2P message by processing it and return list of responses to be streamed back to the client """ - def process(%GetBootstrappingNodes{patch: patch}) do + @spec process(request(), Crypto.key()) :: response() + def process(%GetBootstrappingNodes{patch: patch}, _) do top_nodes = P2P.authorized_and_available_nodes() closest_nodes = @@ -1294,17 +1299,17 @@ defmodule Archethic.P2P.Message do } end - def process(%GetStorageNonce{public_key: public_key}) do + def process(%GetStorageNonce{public_key: public_key}, _) do %EncryptedStorageNonce{ digest: Crypto.encrypt_storage_nonce(public_key) } end - def process(%ListNodes{}) do + def process(%ListNodes{}, _) do %NodeList{nodes: P2P.list_nodes()} end - def process(%NewTransaction{transaction: tx}) do + def process(%NewTransaction{transaction: tx}, _) do case Archethic.send_new_transaction(tx) do :ok -> %Ok{} @@ -1314,12 +1319,12 @@ defmodule Archethic.P2P.Message do end end - def process(%ValidationError{context: context, reason: reason, address: address}) do + def process(%ValidationError{context: context, reason: reason, address: address}, _) do TransactionSubscriber.report_error(address, context, reason) %Ok{} end - def process(%GetTransaction{address: tx_address}) do + def process(%GetTransaction{address: tx_address}, _) do case TransactionChain.get_transaction(tx_address) do {:ok, tx} -> tx @@ -1333,10 +1338,13 @@ defmodule Archethic.P2P.Message do end # paging_state recieved contains binary offset for next page , to be used for query - def process(%GetTransactionChain{ - address: tx_address, - paging_state: paging_state - }) do + def process( + %GetTransactionChain{ + address: tx_address, + paging_state: paging_state + }, + _ + ) do {chain, more?, paging_state} = tx_address |> TransactionChain.get([], paging_state: paging_state) @@ -1346,7 +1354,7 @@ defmodule Archethic.P2P.Message do %TransactionList{transactions: chain, paging_state: paging_state, more?: more?} end - def process(%GetUnspentOutputs{address: tx_address, offset: offset}) do + def process(%GetUnspentOutputs{address: tx_address, offset: offset}, _) do utxos = Account.get_unspent_outputs(tx_address) utxos_length = length(utxos) @@ -1388,7 +1396,7 @@ defmodule Archethic.P2P.Message do } end - def process(%GetP2PView{node_public_keys: node_public_keys}) do + def process(%GetP2PView{node_public_keys: node_public_keys}, _) do nodes = Enum.map(node_public_keys, fn key -> {:ok, node} = P2P.get_node_info(key) @@ -1399,26 +1407,38 @@ defmodule Archethic.P2P.Message do %P2PView{nodes_view: view} end - def process(%StartMining{ - transaction: tx = %Transaction{}, - welcome_node_public_key: welcome_node_public_key, - validation_node_public_keys: validation_nodes - }) - when length(validation_nodes) > 0 do + def process( + %StartMining{ + transaction: tx = %Transaction{}, + welcome_node_public_key: welcome_node_public_key, + validation_node_public_keys: validation_nodes + }, + _ + ) do with {:election, true} <- {:election, Mining.valid_election?(tx, validation_nodes)}, + {:elected, true} <- + {:elected, Enum.any?(validation_nodes, &(&1 == Crypto.last_node_public_key()))}, {:mining, false} <- {:mining, Mining.processing?(tx.address)} do {:ok, _} = Mining.start(tx, welcome_node_public_key, validation_nodes) %Ok{} else - {:election, _} -> + {:election, false} -> Logger.error("Invalid validation node election", transaction_address: Base.encode16(tx.address), transaction_type: tx.type ) - raise "Invalid validate node election" + %Error{reason: :network_issue} + + {:elected, false} -> + Logger.error("Unexpected start mining message", + transaction_address: Base.encode16(tx.address), + transaction_type: tx.type + ) + + %Error{reason: :network_issue} - {:mining, _} -> + {:mining, true} -> Logger.warning("Transaction already in mining process", transaction_address: Base.encode16(tx.address), transaction_type: tx.type @@ -1428,14 +1448,17 @@ defmodule Archethic.P2P.Message do end end - def process(%AddMiningContext{ - address: tx_address, - validation_node_public_key: validation_node, - previous_storage_nodes_public_keys: previous_storage_nodes_public_keys, - chain_storage_nodes_view: chain_storage_nodes_view, - beacon_storage_nodes_view: beacon_storage_nodes_view, - io_storage_nodes_view: io_storage_nodes_view - }) do + def process( + %AddMiningContext{ + address: tx_address, + validation_node_public_key: validation_node, + previous_storage_nodes_public_keys: previous_storage_nodes_public_keys, + chain_storage_nodes_view: chain_storage_nodes_view, + beacon_storage_nodes_view: beacon_storage_nodes_view, + io_storage_nodes_view: io_storage_nodes_view + }, + _ + ) do :ok = Mining.add_mining_context( tx_address, @@ -1449,89 +1472,118 @@ defmodule Archethic.P2P.Message do %Ok{} end - def process(%ReplicateTransactionChain{ - transaction: tx, - replying_node: replying_node_public_key - }) do - Task.Supervisor.start_child(TaskSupervisor, fn -> - response = - case Replication.validate_and_store_transaction_chain(tx) do - :ok -> - tx_summary = TransactionSummary.from_transaction(tx) - - %AcknowledgeStorage{ - address: tx.address, - signature: - Crypto.sign_with_first_node_key(TransactionSummary.serialize(tx_summary)), - node_public_key: Crypto.first_node_public_key() - } - - {:error, :transaction_already_exists} -> - %ReplicationError{address: tx.address, reason: :transaction_already_exists} - - {:error, invalid_tx_error} -> - %ReplicationError{address: tx.address, reason: invalid_tx_error} - end + def process( + %ReplicateTransactionChain{ + transaction: tx = %Transaction{address: tx_address, type: tx_type}, + replying_node: replying_node_public_key + }, + _ + ) do + # We don't check the election for network transactions because all the nodes receive the chain replication message + # The chain storage nodes election is all the authorized nodes but during I/O replication, we send this message to enforce + # the synchronization of the network chains + if Transaction.network_type?(tx_type) do + process_replication_chain(tx, replying_node_public_key) + else + storage_nodes = + Election.chain_storage_nodes_with_type( + tx_address, + tx_type, + P2P.authorized_and_available_nodes() + ) - if replying_node_public_key do - P2P.send_message(replying_node_public_key, response) + # Replicate transaction chain only if the current node is one of the chain storage nodes + if Utils.key_in_node_list?(storage_nodes, Crypto.first_node_public_key()) do + process_replication_chain(tx, replying_node_public_key) end - end) + end %Ok{} end - def process(%ReplicateTransaction{transaction: tx}) do - case Replication.validate_and_store_transaction(tx) do - :ok -> - %Ok{} + def process( + %ReplicateTransaction{ + transaction: + tx = %Transaction{validation_stamp: %ValidationStamp{timestamp: validation_time}} + }, + _ + ) do + resolved_addresses = TransactionChain.resolve_transaction_addresses(tx, validation_time) - {:error, :transaction_already_exists} -> - %ReplicationError{address: tx.address, reason: :transaction_already_exists} + io_storage_nodes = + if Transaction.network_type?(tx.type) do + P2P.list_nodes() + else + resolved_addresses + |> Enum.map(fn {_origin, resolved} -> resolved end) + |> Enum.concat([LedgerOperations.burning_address()]) + |> Election.io_storage_nodes(P2P.authorized_and_available_nodes()) + end - {:error, invalid_tx_reason} -> - %ReplicationError{address: tx.address, reason: invalid_tx_reason} + # Replicate tx only if the current node is one of the I/O storage nodes + if Utils.key_in_node_list?(io_storage_nodes, Crypto.first_node_public_key()) do + case Replication.validate_and_store_transaction(tx) do + :ok -> + %Ok{} + + {:error, :transaction_already_exists} -> + %ReplicationError{address: tx.address, reason: :transaction_already_exists} + + {:error, invalid_tx_reason} -> + %ReplicationError{address: tx.address, reason: invalid_tx_reason} + end + else + %Ok{} end end - def process(%AcknowledgeStorage{ - address: address, - signature: signature, - node_public_key: node_public_key - }) do + def process( + %AcknowledgeStorage{ + address: address, + signature: signature, + node_public_key: node_public_key + }, + _ + ) do Mining.confirm_replication(address, signature, node_public_key) %Ok{} end - def process(%ReplicationError{ - address: address, - reason: reason - }) do + def process( + %ReplicationError{ + address: address, + reason: reason + }, + _ + ) do Mining.notify_replication_error(address, reason) %Ok{} end - def process(%CrossValidate{ - address: tx_address, - validation_stamp: stamp, - replication_tree: replication_tree, - confirmed_validation_nodes: confirmed_validation_nodes - }) do + def process( + %CrossValidate{ + address: tx_address, + validation_stamp: stamp, + replication_tree: replication_tree, + confirmed_validation_nodes: confirmed_validation_nodes + }, + _ + ) do Mining.cross_validate(tx_address, stamp, replication_tree, confirmed_validation_nodes) %Ok{} end - def process(%CrossValidationDone{address: tx_address, cross_validation_stamp: stamp}) do + def process(%CrossValidationDone{address: tx_address, cross_validation_stamp: stamp}, _) do Mining.add_cross_validation_stamp(tx_address, stamp) %Ok{} end - def process(%NotifyEndOfNodeSync{node_public_key: public_key, timestamp: timestamp}) do + def process(%NotifyEndOfNodeSync{node_public_key: public_key, timestamp: timestamp}, _) do BeaconChain.add_end_of_node_sync(public_key, timestamp) %Ok{} end - def process(%GetLastTransaction{address: address}) do + def process(%GetLastTransaction{address: address}, _) do case TransactionChain.get_last_transaction(address) do {:ok, tx} -> tx @@ -1544,7 +1596,7 @@ defmodule Archethic.P2P.Message do end end - def process(%GetBalance{address: address}) do + def process(%GetBalance{address: address}, _) do %{uco: uco, token: token} = Account.get_balance(address) %Balance{ @@ -1553,7 +1605,7 @@ defmodule Archethic.P2P.Message do } end - def process(%GetTransactionInputs{address: address, offset: offset, limit: limit}) do + def process(%GetTransactionInputs{address: address, offset: offset, limit: limit}, _) do contract_inputs = address |> Contracts.list_contract_transactions() @@ -1612,34 +1664,37 @@ defmodule Archethic.P2P.Message do end # Returns the length of the transaction chain - def process(%GetTransactionChainLength{address: address}) do + def process(%GetTransactionChainLength{address: address}, _) do %TransactionChainLength{ length: TransactionChain.size(address) } end # Returns the first public_key for a given public_key and if the public_key is used for the first time, return the same public_key. - def process(%GetFirstPublicKey{public_key: public_key}) do + def process(%GetFirstPublicKey{public_key: public_key}, _) do %FirstPublicKey{ public_key: TransactionChain.get_first_public_key(public_key) } end - def process(%GetFirstAddress{address: address}) do + def process(%GetFirstAddress{address: address}, _) do genesis_address = TransactionChain.get_genesis_address(address) %FirstAddress{address: genesis_address} end - def process(%GetLastTransactionAddress{address: address, timestamp: timestamp}) do + def process(%GetLastTransactionAddress{address: address, timestamp: timestamp}, _) do {address, time} = TransactionChain.get_last_address(address, timestamp) %LastTransactionAddress{address: address, timestamp: time} end - def process(%NotifyLastTransactionAddress{ - last_address: last_address, - genesis_address: genesis_address, - timestamp: timestamp - }) do + def process( + %NotifyLastTransactionAddress{ + last_address: last_address, + genesis_address: genesis_address, + timestamp: timestamp + }, + _ + ) do with {local_last_address, local_last_timestamp} <- TransactionChain.get_last_address(genesis_address), true <- local_last_address != last_address, @@ -1653,7 +1708,7 @@ defmodule Archethic.P2P.Message do %Ok{} end - def process(%GetTransactionSummary{address: address}) do + def process(%GetTransactionSummary{address: address}, _) do case TransactionChain.get_transaction_summary(address) do {:ok, summary} -> summary @@ -1663,7 +1718,7 @@ defmodule Archethic.P2P.Message do end end - def process(%GetCurrentSummaries{subsets: subsets}) do + def process(%GetCurrentSummaries{subsets: subsets}, _) do transaction_summaries = Enum.flat_map(subsets, fn subset -> transaction_summaries = BeaconChain.get_summary_slots(subset) @@ -1683,14 +1738,14 @@ defmodule Archethic.P2P.Message do } end - def process(%NodeAvailability{public_key: public_key}) do + def process(%NodeAvailability{public_key: public_key}, _) do P2P.set_node_globally_available(public_key) %Ok{} end - def process(%Ping{}), do: %Ok{} + def process(%Ping{}, _), do: %Ok{} - def process(%GetBeaconSummary{address: address}) do + def process(%GetBeaconSummary{address: address}, _) do case BeaconChain.get_summary(address) do {:ok, summary} -> summary @@ -1700,38 +1755,54 @@ defmodule Archethic.P2P.Message do end end - def process(%NewBeaconSlot{slot: slot}) do - case BeaconChain.load_slot(slot) do - :ok -> - %Ok{} + def process(%NewBeaconSlot{slot: slot = %Slot{subset: subset, slot_time: slot_time}}, _) do + summary_time = BeaconChain.next_summary_date(slot_time) + node_list = P2P.authorized_nodes(summary_time) + + beacon_summary_nodes = + Election.beacon_storage_nodes( + subset, + summary_time, + node_list, + Election.get_storage_constraints() + ) + + # Load BeaconChain's slot only for the summary nodes + with true <- Utils.key_in_node_list?(beacon_summary_nodes, Crypto.first_node_public_key()), + :ok <- BeaconChain.load_slot(slot) do + %Ok{} + else + false -> + Logger.error("Unexpected beacon slot broadcast") + %Error{reason: :network_issue} :error -> %Error{reason: :invalid_transaction} end end - def process(%GetBeaconSummaries{addresses: addresses}) do + def process(%GetBeaconSummaries{addresses: addresses}, _) do %BeaconSummaryList{ summaries: BeaconChain.get_beacon_summaries(addresses) } end - def process(%RegisterBeaconUpdates{node_public_key: node_public_key, subset: subset}) do + def process(%RegisterBeaconUpdates{node_public_key: node_public_key, subset: subset}, _) do BeaconChain.subscribe_for_beacon_updates(subset, node_public_key) %Ok{} end - def process(%BeaconUpdate{transaction_attestations: transaction_attestations}) do + def process(%BeaconUpdate{transaction_attestations: transaction_attestations}, sender) do Enum.each(transaction_attestations, fn %ReplicationAttestation{ transaction_summary: tx_summary } -> - process(tx_summary) + process(tx_summary, sender) end) %Ok{} end - def process(tx_summary = %TransactionSummary{}) do + def process(tx_summary = %TransactionSummary{}, _) do PubSub.notify_transaction_attestation(tx_summary) %Ok{} @@ -1740,7 +1811,8 @@ defmodule Archethic.P2P.Message do def process( attestation = %ReplicationAttestation{ transaction_summary: %TransactionSummary{address: tx_address, type: tx_type} - } + }, + _ ) do case ReplicationAttestation.validate(attestation) do :ok -> @@ -1757,7 +1829,7 @@ defmodule Archethic.P2P.Message do end end - def process(%GetBeaconSummariesAggregate{date: date}) do + def process(%GetBeaconSummariesAggregate{date: date}, _) do case BeaconChain.get_summaries_aggregate(date) do {:ok, aggregate} -> aggregate @@ -1767,8 +1839,35 @@ defmodule Archethic.P2P.Message do end end - def process(%NotifyPreviousChain{address: address}) do + def process(%NotifyPreviousChain{address: address}, _) do Replication.acknowledge_previous_storage_nodes(address) %Ok{} end + + defp process_replication_chain(tx, replying_node_public_key) do + Task.Supervisor.start_child(TaskSupervisor, fn -> + response = + case Replication.validate_and_store_transaction_chain(tx) do + :ok -> + tx_summary = TransactionSummary.from_transaction(tx) + + %AcknowledgeStorage{ + address: tx.address, + signature: + Crypto.sign_with_first_node_key(TransactionSummary.serialize(tx_summary)), + node_public_key: Crypto.first_node_public_key() + } + + {:error, :transaction_already_exists} -> + %ReplicationError{address: tx.address, reason: :transaction_already_exists} + + {:error, invalid_tx_error} -> + %ReplicationError{address: tx.address, reason: invalid_tx_error} + end + + if replying_node_public_key do + P2P.send_message(replying_node_public_key, response) + end + end) + end end diff --git a/lib/archethic/utils/regression.ex b/lib/archethic/utils/regression.ex index 686f21c20..e148d1e2b 100644 --- a/lib/archethic/utils/regression.ex +++ b/lib/archethic/utils/regression.ex @@ -5,13 +5,13 @@ defmodule Archethic.Utils.Regression do require Logger alias Archethic.Utils - alias Archethic.Utils.Regression.Benchmark.P2PMessage alias Archethic.Utils.Regression.Playbook.SmartContract alias Archethic.Utils.Regression.Playbook.UCO alias Archethic.Utils.WebClient alias Archethic.Utils.Regression.Benchmark.EndToEndValidation + alias Archethic.Utils.Regression.Benchmark.P2PMessage @playbooks [UCO, SmartContract] @benchmarks [P2PMessage, EndToEndValidation] diff --git a/test/archethic/beacon_chain_test.exs b/test/archethic/beacon_chain_test.exs index 2e8260ea7..9f1b4006d 100644 --- a/test/archethic/beacon_chain_test.exs +++ b/test/archethic/beacon_chain_test.exs @@ -19,6 +19,7 @@ defmodule Archethic.BeaconChainTest do alias Archethic.P2P alias Archethic.P2P.Message.GetBeaconSummaries + alias Archethic.P2P.Message.GetTransactionSummary alias Archethic.P2P.Message.BeaconSummaryList alias Archethic.P2P.Node @@ -217,6 +218,9 @@ defmodule Archethic.BeaconChainTest do |> stub(:send_message, fn _, %GetBeaconSummaries{}, _ -> {:ok, %BeaconSummaryList{summaries: [beacon_summary]}} + + _, %GetTransactionSummary{}, _ -> + {:ok, tx_summary} end) %SummaryAggregate{transaction_summaries: transaction_summaries} = @@ -314,6 +318,9 @@ defmodule Archethic.BeaconChainTest do ^node4, %GetBeaconSummaries{}, _ -> {:ok, %BeaconSummaryList{summaries: [summary_v2]}} + + _, %GetTransactionSummary{}, _ -> + {:ok, tx_summary} end) %SummaryAggregate{transaction_summaries: transaction_summaries} =