From 0cfa42c9378f5cee2a0d9ff37feb233977c6e179 Mon Sep 17 00:00:00 2001 From: Samuel Manzanera Date: Wed, 16 Nov 2022 10:12:05 +0100 Subject: [PATCH] Fix recursion of notify last address --- lib/archethic/mining/distributed_workflow.ex | 33 +++----- lib/archethic/mining/standalone_workflow.ex | 35 +++----- lib/archethic/mining/validation_context.ex | 13 +++ lib/archethic/p2p/message.ex | 39 ++++++--- .../notify_last_transaction_address.ex | 5 +- .../p2p/message/notify_previous_chain.ex | 11 +++ lib/archethic/replication.ex | 80 +++++++------------ test/archethic/p2p/messages_test.exs | 14 +++- test/archethic/replication_test.exs | 78 +++++++----------- 9 files changed, 142 insertions(+), 166 deletions(-) create mode 100644 lib/archethic/p2p/message/notify_previous_chain.ex diff --git a/lib/archethic/mining/distributed_workflow.ex b/lib/archethic/mining/distributed_workflow.ex index bf5c8b4f5..3adfb6492 100644 --- a/lib/archethic/mining/distributed_workflow.ex +++ b/lib/archethic/mining/distributed_workflow.ex @@ -30,13 +30,12 @@ defmodule Archethic.Mining.DistributedWorkflow do alias Archethic.P2P.Message.AddMiningContext alias Archethic.P2P.Message.CrossValidate alias Archethic.P2P.Message.CrossValidationDone + alias Archethic.P2P.Message.NotifyPreviousChain alias Archethic.P2P.Message.ReplicateTransactionChain alias Archethic.P2P.Message.ReplicateTransaction alias Archethic.P2P.Message.ValidationError alias Archethic.P2P.Node - alias Archethic.Replication - alias Archethic.TransactionChain alias Archethic.TransactionChain.Transaction alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations @@ -749,29 +748,17 @@ defmodule Archethic.Mining.DistributedWorkflow do :notify_previous_chain, :replication, _data = %{ - context: %ValidationContext{ - transaction: tx, - validation_stamp: %ValidationStamp{timestamp: tx_timestamp} - } + context: + context = %ValidationContext{ + transaction: tx + } } ) do - previous_address = Transaction.previous_address(tx) - - genesis_address = - case TransactionChain.fetch_genesis_address_remotely(previous_address) do - {:ok, genesis_address} -> - genesis_address - - _ -> - previous_address - end - - Replication.acknowledge_previous_storage_nodes( - tx.address, - genesis_address, - previous_address, - tx_timestamp - ) + unless Transaction.network_type?(tx.type) do + context + |> ValidationContext.get_confirmed_replication_nodes() + |> P2P.broadcast_message(%NotifyPreviousChain{address: tx.address}) + end :stop end diff --git a/lib/archethic/mining/standalone_workflow.ex b/lib/archethic/mining/standalone_workflow.ex index 2b686c353..cc5ba4e25 100644 --- a/lib/archethic/mining/standalone_workflow.ex +++ b/lib/archethic/mining/standalone_workflow.ex @@ -20,16 +20,14 @@ defmodule Archethic.Mining.StandaloneWorkflow do alias Archethic.Mining.WorkflowRegistry alias Archethic.P2P + alias Archethic.P2P.Message.NotifyPreviousChain alias Archethic.P2P.Message.ReplicateTransaction alias Archethic.P2P.Message.ReplicateTransactionChain alias Archethic.P2P.Message.ValidationError alias Archethic.P2P.Node - alias Archethic.Replication - alias Archethic.TransactionChain alias Archethic.TransactionChain.Transaction - alias Archethic.TransactionChain.Transaction.ValidationStamp alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations alias Archethic.TransactionChain.TransactionSummary @@ -331,26 +329,15 @@ defmodule Archethic.Mining.StandaloneWorkflow do ) end - defp notify_previous_chain(%ValidationContext{ - transaction: tx, - validation_stamp: %ValidationStamp{timestamp: tx_timestamp} - }) do - previous_address = Transaction.previous_address(tx) - - genesis_address = - case TransactionChain.fetch_genesis_address_remotely(previous_address) do - {:ok, genesis_address} -> - genesis_address - - _ -> - previous_address - end - - Replication.acknowledge_previous_storage_nodes( - tx.address, - genesis_address, - previous_address, - tx_timestamp - ) + defp notify_previous_chain( + context = %ValidationContext{ + transaction: tx + } + ) do + unless Transaction.network_type?(tx.type) do + context + |> ValidationContext.get_confirmed_replication_nodes() + |> P2P.broadcast_message(%NotifyPreviousChain{address: tx.address}) + end end end diff --git a/lib/archethic/mining/validation_context.ex b/lib/archethic/mining/validation_context.ex index ba1da4d2e..198ce34ee 100644 --- a/lib/archethic/mining/validation_context.ex +++ b/lib/archethic/mining/validation_context.ex @@ -1223,6 +1223,19 @@ defmodule Archethic.Mining.ValidationContext do |> Enum.count() == nb_confirmed_replications end + @doc """ + Return the list of nodes which confirmed the transaction replication + """ + @spec get_confirmed_replication_nodes(t()) :: list(Node.t()) + def get_confirmed_replication_nodes(%__MODULE__{ + chain_storage_nodes: chain_storage_nodes, + storage_nodes_confirmations: storage_nodes_confirmations + }) do + Enum.map(storage_nodes_confirmations, fn {index, _} -> + Enum.at(chain_storage_nodes, index) + end) + end + @doc """ Get the list of I/O replication nodes """ diff --git a/lib/archethic/p2p/message.ex b/lib/archethic/p2p/message.ex index a704e9569..7a5147608 100644 --- a/lib/archethic/p2p/message.ex +++ b/lib/archethic/p2p/message.ex @@ -58,6 +58,7 @@ defmodule Archethic.P2P.Message do alias __MODULE__.NodeList alias __MODULE__.NotFound alias __MODULE__.NotifyEndOfNodeSync + alias __MODULE__.NotifyPreviousChain alias __MODULE__.NotifyLastTransactionAddress alias __MODULE__.Ok alias __MODULE__.P2PView @@ -137,6 +138,7 @@ defmodule Archethic.P2P.Message do | ValidationError.t() | GetCurrentSummaries.t() | GetBeaconSummariesAggregate.t() + | NotifyPreviousChain.t() @type response :: Ok.t() @@ -382,10 +384,9 @@ defmodule Archethic.P2P.Message do def encode(%NotifyLastTransactionAddress{ last_address: last_address, genesis_address: genesis_address, - previous_address: previous_address, timestamp: timestamp }) do - <<22::8, last_address::binary, genesis_address::binary, previous_address::binary, + <<22::8, last_address::binary, genesis_address::binary, DateTime.to_unix(timestamp, :millisecond)::64>> end @@ -430,6 +431,10 @@ defmodule Archethic.P2P.Message do <<33::8, DateTime.to_unix(date)::32>> end + def encode(%NotifyPreviousChain{address: address}) do + <<34::8, address::binary>> + end + def encode(aggregate = %SummaryAggregate{}) do <<231::8, SummaryAggregate.serialize(aggregate)::bitstring>> end @@ -883,13 +888,11 @@ defmodule Archethic.P2P.Message do def decode(<<22::8, rest::bitstring>>) do {last_address, rest} = Utils.deserialize_address(rest) - {genesis_address, rest} = Utils.deserialize_address(rest) - {previous_address, <>} = Utils.deserialize_address(rest) + {genesis_address, <>} = Utils.deserialize_address(rest) {%NotifyLastTransactionAddress{ last_address: last_address, genesis_address: genesis_address, - previous_address: previous_address, timestamp: DateTime.from_unix!(timestamp, :millisecond) }, rest} end @@ -965,6 +968,11 @@ defmodule Archethic.P2P.Message do {%GetBeaconSummariesAggregate{date: DateTime.from_unix!(timestamp)}, rest} end + def decode(<<34::8, rest::bitstring>>) do + {address, rest} = Utils.deserialize_address(rest) + {%NotifyPreviousChain{address: address}, rest} + end + def decode(<<231::8, rest::bitstring>>) do SummaryAggregate.deserialize(rest) end @@ -1630,15 +1638,17 @@ defmodule Archethic.P2P.Message do def process(%NotifyLastTransactionAddress{ last_address: last_address, genesis_address: genesis_address, - previous_address: previous_address, timestamp: timestamp }) do - Replication.acknowledge_previous_storage_nodes( - last_address, - genesis_address, - previous_address, - timestamp - ) + with {local_last_address, local_last_timestamp} <- + TransactionChain.get_last_address(genesis_address), + true <- local_last_address != last_address, + :gt <- DateTime.compare(timestamp, local_last_timestamp) do + TransactionChain.register_last_address(genesis_address, last_address, timestamp) + + # Stop potential previous smart contract + Contracts.stop_contract(local_last_address) + end %Ok{} end @@ -1756,4 +1766,9 @@ defmodule Archethic.P2P.Message do %NotFound{} end end + + def process(%NotifyPreviousChain{address: address}) do + Replication.acknowledge_previous_storage_nodes(address) + %Ok{} + end end diff --git a/lib/archethic/p2p/message/notify_last_transaction_address.ex b/lib/archethic/p2p/message/notify_last_transaction_address.ex index 0de62218c..d2f0c2789 100644 --- a/lib/archethic/p2p/message/notify_last_transaction_address.ex +++ b/lib/archethic/p2p/message/notify_last_transaction_address.ex @@ -2,15 +2,14 @@ defmodule Archethic.P2P.Message.NotifyLastTransactionAddress do @moduledoc """ Represents a message with to notify a pool of the last address of a previous address """ - @enforce_keys [:last_address, :genesis_address, :previous_address, :timestamp] - defstruct [:last_address, :genesis_address, :previous_address, :timestamp] + @enforce_keys [:last_address, :genesis_address, :timestamp] + defstruct [:last_address, :genesis_address, :timestamp] alias Archethic.Crypto @type t :: %__MODULE__{ last_address: Crypto.versioned_hash(), genesis_address: Crypto.versioned_hash(), - previous_address: Crypto.versioned_hash(), timestamp: DateTime.t() } end diff --git a/lib/archethic/p2p/message/notify_previous_chain.ex b/lib/archethic/p2p/message/notify_previous_chain.ex new file mode 100644 index 000000000..aab8c1344 --- /dev/null +++ b/lib/archethic/p2p/message/notify_previous_chain.ex @@ -0,0 +1,11 @@ +defmodule Archethic.P2P.Message.NotifyPreviousChain do + @moduledoc """ + Represents a message used to notify previous chain storage nodes about the last transaction address + """ + + defstruct [:address] + + @type t :: %__MODULE__{ + address: binary() + } +end diff --git a/lib/archethic/replication.ex b/lib/archethic/replication.ex index 53e7ce383..8ec0bd010 100644 --- a/lib/archethic/replication.ex +++ b/lib/archethic/replication.ex @@ -331,59 +331,35 @@ defmodule Archethic.Replication do end @doc """ - Notify the previous storage pool than a new transaction on the chain is present + Notify the previous storage nodes than a new transaction on the chain is present """ - @spec acknowledge_previous_storage_nodes( - last_address :: binary(), - genesis_address :: binary(), - previous_address :: binary(), - tx_time :: DateTime.t() - ) :: :ok - def acknowledge_previous_storage_nodes( - last_address, - genesis_address, - previous_address, - timestamp = %DateTime{} - ) - when is_binary(last_address) and is_binary(genesis_address) and is_binary(previous_address) do - storage_nodes = - Election.chain_storage_nodes(previous_address, P2P.authorized_and_available_nodes()) - - # If the current is node was a previous storage nodes - if Utils.key_in_node_list?(storage_nodes, Crypto.first_node_public_key()) do - TransactionChain.register_last_address(genesis_address, last_address, timestamp) - Contracts.stop_contract(previous_address) - - case TransactionChain.get_transaction(previous_address, [:previous_public_key]) do - {:ok, tx} -> - next_previous_address = Transaction.previous_address(tx) - - previous_storage_nodes = - Election.chain_storage_nodes( - next_previous_address, - P2P.authorized_and_available_nodes() - ) - - # Take only the nodes which were not in the previous shard but previously on the chain - diff_nodes = previous_storage_nodes -- storage_nodes - - P2P.broadcast_message(diff_nodes, %NotifyLastTransactionAddress{ - last_address: last_address, - genesis_address: genesis_address, - previous_address: next_previous_address, - timestamp: timestamp - }) - - _ -> - :ok - end - else - P2P.broadcast_message(storage_nodes, %NotifyLastTransactionAddress{ - last_address: last_address, - genesis_address: genesis_address, - previous_address: previous_address, - timestamp: timestamp - }) + @spec acknowledge_previous_storage_nodes(address :: binary()) :: :ok + def acknowledge_previous_storage_nodes(address) when is_binary(address) do + case TransactionChain.get_genesis_address(address) do + ^address -> + # No transaction chain found as the DB request return the argument if not found. + :ok + + genesis_address -> + last_storage_nodes = + Election.chain_storage_nodes(address, P2P.authorized_and_available_nodes()) + + {:ok, %Transaction{validation_stamp: %ValidationStamp{timestamp: timestamp}}} = + TransactionChain.get_transaction(address, validation_stamp: [:timestamp]) + + # Send a message to all the previous storage nodes + address + |> TransactionChain.list_chain_addresses() + |> Stream.flat_map(fn {address, _} -> + Election.chain_storage_nodes(address, P2P.authorized_and_available_nodes()) + end) + |> Enum.uniq_by(& &1.first_public_key) + |> Enum.reject(&Utils.key_in_node_list?(last_storage_nodes, &1.first_public_key)) + |> P2P.broadcast_message(%NotifyLastTransactionAddress{ + last_address: address, + genesis_address: genesis_address, + timestamp: timestamp + }) end end diff --git a/test/archethic/p2p/messages_test.exs b/test/archethic/p2p/messages_test.exs index 8ebdf1467..b89b18b93 100644 --- a/test/archethic/p2p/messages_test.exs +++ b/test/archethic/p2p/messages_test.exs @@ -34,6 +34,7 @@ defmodule Archethic.P2P.MessageTest do alias Archethic.P2P.Message.NotFound alias Archethic.P2P.Message.NotifyEndOfNodeSync alias Archethic.P2P.Message.NotifyLastTransactionAddress + alias Archethic.P2P.Message.NotifyPreviousChain alias Archethic.P2P.Message.Ok alias Archethic.P2P.Message.P2PView alias Archethic.P2P.Message.Ping @@ -862,7 +863,6 @@ defmodule Archethic.P2P.MessageTest do msg = %NotifyLastTransactionAddress{ last_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>, genesis_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>, - previous_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>, timestamp: DateTime.utc_now() |> DateTime.truncate(:millisecond) } @@ -873,6 +873,18 @@ defmodule Archethic.P2P.MessageTest do |> elem(0) end + test "NotifyPreviousChain message" do + msg = %NotifyPreviousChain{ + address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> + } + + assert msg == + msg + |> Message.encode() + |> Message.decode() + |> elem(0) + end + test "GetTransactionSummary message" do msg = %GetTransactionSummary{ address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> diff --git a/test/archethic/replication_test.exs b/test/archethic/replication_test.exs index ddf0208bc..52e5c1138 100644 --- a/test/archethic/replication_test.exs +++ b/test/archethic/replication_test.exs @@ -235,66 +235,42 @@ defmodule Archethic.ReplicationTest do %{tx | validation_stamp: validation_stamp, cross_validation_stamps: [cross_validation_stamp]} end - describe "acknowledge_previous_storage_nodes/2" do - test "should register new address on chain" do - MockDB - |> stub(:add_last_transaction_address, fn _address, _last_address, _ -> - :ok - end) - |> expect(:get_last_chain_address, fn _ -> {"@Alice2", DateTime.utc_now()} end) - - assert :ok = - Replication.acknowledge_previous_storage_nodes( - "@Alice2", - "@Alice0", - "@Alice1", - DateTime.utc_now() - ) + describe "acknowledge_previous_storage_nodes/1" do + test "should notify last transaction address for the previous storage nodes" do + me = self() - assert {"@Alice2", _} = TransactionChain.get_last_address("@Alice1") - end + Enum.each(0..50, fn i -> + P2P.add_and_connect_node(%Node{ + first_public_key: "key-#{i}", + last_public_key: "key-#{i}", + geo_patch: "#{Integer.to_string(i, 16)}A", + available?: true, + authorized?: true, + authorization_date: DateTime.utc_now() + }) + end) - test "should notify previous storage pool if transaction exists" do MockDB - |> stub(:add_last_transaction_address, fn _address, _last_address, _ -> - :ok - end) - |> expect(:get_last_chain_address, fn _ -> {"@Alice2", DateTime.utc_now()} end) - |> stub(:get_transaction, fn _, _ -> - {:ok, %Transaction{previous_public_key: "Alice1"}} + |> expect(:get_first_chain_address, fn _ -> "@Alice0" end) + |> expect(:get_transaction, fn _, _ -> + {:ok, %Transaction{validation_stamp: %ValidationStamp{timestamp: DateTime.utc_now()}}} end) - - me = self() + |> expect(:list_chain_addresses, fn _ -> [{"@Alice1", 0}] end) MockClient - |> stub(:send_message, fn _, %NotifyLastTransactionAddress{last_address: _}, _ -> - send(me, :notification_sent) + |> stub(:send_message, fn _, + %NotifyLastTransactionAddress{ + last_address: last_address, + genesis_address: genesis_address + }, + _ -> + send(me, {:last_address, last_address, genesis_address}) {:ok, %Ok{}} end) - P2P.add_and_connect_node(%Node{ - ip: {127, 0, 0, 1}, - port: 3000, - first_public_key: :crypto.strong_rand_bytes(32), - last_public_key: :crypto.strong_rand_bytes(32), - geo_patch: "AAA", - available?: true, - authorization_date: DateTime.utc_now(), - authorized?: true, - reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> - }) - - assert :ok = - Replication.acknowledge_previous_storage_nodes( - "@Alice2", - "@Alice0", - "@Alice1", - DateTime.utc_now() - ) - - assert {"@Alice2", _} = TransactionChain.get_last_address("@Alice1") - - assert_receive :notification_sent, 500 + assert :ok = Replication.acknowledge_previous_storage_nodes("@Alice2") + + assert_receive {:last_address, "@Alice2", "@Alice0"} end end end