From 37bfba390a8119ccdd2527c1f381291731fb599e Mon Sep 17 00:00:00 2001 From: Apoorv-2204 <90304197+apoorv-2204@users.noreply.github.com> Date: Fri, 9 Sep 2022 12:30:12 +0530 Subject: [PATCH] enforce resync of network txs (#515) If there is quick disconnection and re-connection of a (authorised and available) node, it is a possibility that it will miss some oracle and node shared secrets transactions , which results in inconsistent node state.Post bootstrap we must resync node shared secrets and oracle txs so that current node state is same as other nodes state. So during validation it will not report invalid tx --- lib/archethic/bootstrap.ex | 63 +++++- test/archethic/bootstrap_test.exs | 307 ++++++++++++++++++++++++---- test/support/transaction_factory.ex | 58 +++++- 3 files changed, 381 insertions(+), 47 deletions(-) diff --git a/lib/archethic/bootstrap.ex b/lib/archethic/bootstrap.ex index 7f0e75af5..2a21cd55e 100644 --- a/lib/archethic/bootstrap.ex +++ b/lib/archethic/bootstrap.ex @@ -15,7 +15,9 @@ defmodule Archethic.Bootstrap do P2P, P2P.Node, P2P.Listener, - SelfRepair + SelfRepair, + TransactionChain, + Replication } require Logger @@ -209,6 +211,8 @@ defmodule Archethic.Bootstrap do end Archethic.Bootstrap.NetworkConstraints.persist_genesis_address() + resync_network_chain() + Sync.publish_end_of_sync() SelfRepair.start_scheduler() @@ -217,6 +221,63 @@ defmodule Archethic.Bootstrap do Listener.listen() end + def resync_network_chain() do + Logger.info("Enforced Resync: Started!") + + if P2P.authorized_node?() && P2P.available_node?() do + # evict this node + nodes = + Enum.reject( + P2P.authorized_and_available_nodes(), + &(&1.first_public_key == Crypto.first_node_public_key()) + ) + + do_resync_network_chain([:oracle, :node_shared_secrets], nodes) + end + end + + @spec do_resync_network_chain(list(atom), list(P2P.Node.t()) | []) :: :ok + def do_resync_network_chain(_type_list, _nodes = []), + do: Logger.info("Enforce Reync of Network Txs: failure, No-Nodes") + + # by type: Get gen addr, get last address (remotely & locally) + # compare, if dont match, fetch last tx remotely + def do_resync_network_chain(type_list, nodes) when is_list(nodes) do + Task.Supervisor.async_stream_nolink(Archethic.TaskSupervisor, type_list, fn type -> + with addr when is_binary(addr) <- get_genesis_addr(type), + {:ok, rem_last_addr} <- TransactionChain.resolve_last_address(addr), + {local_last_addr, _} <- TransactionChain.get_last_address(addr), + false <- rem_last_addr == local_last_addr, + {:ok, tx} <- TransactionChain.fetch_transaction_remotely(rem_last_addr, nodes), + :ok <- Replication.validate_and_store_transaction_chain(tx) do + Logger.info("Enforced Resync: Success", transaction_type: type) + :ok + else + true -> + Logger.info("Enforced Resync: No new transaction to sync", transaction_type: type) + :ok + + e when e in [nil, []] -> + Logger.debug("Enforced Resync: Transaction not available", transaction_type: type) + :ok + + e -> + Logger.debug("Enforced Resync: Unexpected Error", transaction_type: type) + Logger.debug(e) + end + end) + |> Stream.run() + end + + @spec get_genesis_addr(:node_shared_secrets | :oracle) :: binary() | nil + defp get_genesis_addr(:oracle) do + Archethic.OracleChain.get_gen_addr().current |> elem(0) + end + + defp get_genesis_addr(:node_shared_secrets) do + Archethic.SharedSecrets.get_gen_addr(:node_shared_secrets) + end + defp first_initialization( ip, port, diff --git a/test/archethic/bootstrap_test.exs b/test/archethic/bootstrap_test.exs index 6eff8e8c9..51e0dce8f 100644 --- a/test/archethic/bootstrap_test.exs +++ b/test/archethic/bootstrap_test.exs @@ -1,52 +1,57 @@ defmodule Archethic.BootstrapTest do use ArchethicCase - alias Archethic.Crypto + alias Archethic.{ + Bootstrap, + Crypto, + P2P, + P2P.BootstrappingSeeds, + P2P.Node, + Replication, + SharedSecrets, + SharedSecrets.NodeRenewalScheduler, + TransactionChain, + TransactionFactory + } + + alias Archethic.P2P.Message.{ + GetTransactionChainLength, + TransactionChainLength, + BootstrappingNodes, + EncryptedStorageNonce, + GetBootstrappingNodes, + GetLastTransactionAddress, + GetStorageNonce, + GetTransaction, + GetTransactionChain, + GetTransactionSummary, + GetTransactionInputs, + GetFirstAddress, + FirstAddress, + LastTransactionAddress, + ListNodes, + NewTransaction, + NodeList, + NotFound, + NotifyEndOfNodeSync, + TransactionList, + TransactionInputList, + Ok, + GetFirstAddress, + NotFound + } + + alias TransactionChain.{ + Transaction, + TransactionSummary, + Transaction.ValidationStamp, + Transaction.ValidationStamp.LedgerOperations + } alias Archethic.BeaconChain.SlotTimer, as: BeaconSlotTimer alias Archethic.BeaconChain.SummaryTimer, as: BeaconSummaryTimer - - alias Archethic.Bootstrap - - alias Archethic.P2P - alias Archethic.P2P.BootstrappingSeeds - alias Archethic.P2P.Message.GetTransactionChainLength - alias Archethic.P2P.Message.TransactionChainLength - alias Archethic.P2P.Message.BootstrappingNodes - alias Archethic.P2P.Message.EncryptedStorageNonce - alias Archethic.P2P.Message.GetBootstrappingNodes - alias Archethic.P2P.Message.GetLastTransactionAddress - alias Archethic.P2P.Message.GetStorageNonce - alias Archethic.P2P.Message.GetTransaction - alias Archethic.P2P.Message.GetTransactionChain - alias Archethic.P2P.Message.GetTransactionSummary - alias Archethic.P2P.Message.GetTransactionInputs - alias Archethic.P2P.Message.LastTransactionAddress - alias Archethic.P2P.Message.ListNodes - alias Archethic.P2P.Message.NewTransaction - alias Archethic.P2P.Message.NodeList - alias Archethic.P2P.Message.NotFound - alias Archethic.P2P.Message.NotifyEndOfNodeSync - alias Archethic.P2P.Message.TransactionList - alias Archethic.P2P.Message.TransactionInputList - alias Archethic.P2P.Message.Ok - alias Archethic.P2P.Node - alias Archethic.P2P.Message.GetFirstAddress - alias Archethic.P2P.Message.NotFound - - alias Archethic.Replication - alias Archethic.SelfRepair.Scheduler, as: SelfRepairScheduler - alias Archethic.SharedSecrets - alias Archethic.SharedSecrets.NodeRenewalScheduler - - alias Archethic.TransactionChain - alias Archethic.TransactionChain.Transaction - alias Archethic.TransactionChain.Transaction.ValidationStamp - alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations - alias Archethic.TransactionChain.TransactionSummary - alias Archethic.Reward.MemTables.RewardTokens, as: RewardMemTable alias Archethic.Reward.MemTablesLoader, as: RewardTableLoader @@ -100,6 +105,10 @@ defmodule Archethic.BootstrapTest do end describe "run/5" do + setup do + :persistent_term.put(:node_shared_secrets_gen_addr, nil) + end + test "should initialize the network when nothing is set before" do MockClient |> stub(:send_message, fn @@ -431,4 +440,220 @@ defmodule Archethic.BootstrapTest do Process.sleep(100) end end + + describe "resync_network_chain/2 nss_chain" do + setup do + p2p_context() + + curr_time = DateTime.utc_now() + + txn0 = + TransactionFactory.create_network_tx(:node_shared_secrets, + index: 0, + timestamp: curr_time |> DateTime.add(-14_400, :second), + prev_txn: [] + ) + + txn1 = + TransactionFactory.create_network_tx(:node_shared_secrets, + index: 1, + timestamp: curr_time |> DateTime.add(-14_400, :second), + prev_txn: [txn0] + ) + + txn2 = + TransactionFactory.create_network_tx(:node_shared_secrets, + index: 2, + timestamp: curr_time |> DateTime.add(-7_200, :second), + prev_txn: [txn1] + ) + + txn3 = + TransactionFactory.create_network_tx(:node_shared_secrets, + index: 3, + timestamp: curr_time |> DateTime.add(-3_600, :second), + prev_txn: [txn2] + ) + + txn4 = + TransactionFactory.create_network_tx(:node_shared_secrets, + index: 4, + timestamp: curr_time, + prev_txn: [txn3] + ) + + :persistent_term.put(:node_shared_secrets_gen_addr, txn0.address) + %{txn0: txn0, txn1: txn1, txn2: txn2, txn3: txn3, txn4: txn4} + end + + test "Should return :ok when Genesis Address are not loaded", _nss_chain do + # first time boot no txns exits yet + :persistent_term.put(:node_shared_secrets_gen_addr, nil) + + assert :ok = + Bootstrap.do_resync_network_chain( + [:node_shared_secrets], + _nodes = P2P.authorized_and_available_nodes() + ) + end + + test "Should return :ok when last address match (locally and remotely)", nss_chain do + # node restart but within renewal interval + me = self() + addr0 = nss_chain.txn0.address + + MockDB + |> stub(:get_last_chain_address, fn ^addr0 -> + send(me, :local_last_addr_request) + {nss_chain.txn4.address, DateTime.utc_now()} + end) + + MockClient + |> stub(:send_message, fn + _, %GetLastTransactionAddress{address: ^addr0}, _ -> + send(me, :remote_last_addr_request) + {:ok, %LastTransactionAddress{address: nss_chain.txn4.address}} + + _, %GetTransaction{}, _ -> + send(me, :fetch_last_txn) + end) + + assert :ok = + Bootstrap.do_resync_network_chain( + [:node_shared_secrets], + _nodes = P2P.authorized_and_available_nodes() + ) + + assert_receive(:local_last_addr_request) + assert_receive(:remote_last_addr_request) + refute_receive(:fetch_last_txn) + end + + test "should Retrieve and Store Network tx's, when last tx's not available", nss_chain do + # scenario nss chain + # addr0 -> addr1 -> addr2 -> addr3 -> addr4 + # node1 => addr0 -> addr1 -> addr2 + # node2 => addr0 -> addr1 -> addr2 -> addr3 -> addr4 + addr0 = nss_chain.txn0.address + addr2 = nss_chain.txn2.address + addr3 = nss_chain.txn3.address + addr4 = nss_chain.txn4.address + + me = self() + + MockDB + |> stub(:get_last_chain_address, fn + ^addr0 -> {addr2, DateTime.utc_now()} + end) + |> stub(:transaction_exists?, fn + ^addr4 -> false + ^addr3 -> false + ^addr2 -> true + end) + |> expect(:get_transaction, fn ^addr3, _ -> + {:error, :transaction_not_exists} + end) + |> expect(:write_transaction_chain, fn + txn_list -> + # to know this fx executed or not we use send + send(me, {:write_transaction_chain, txn_list}) + :ok + end) + |> expect(:write_transaction, fn _tx -> + :ok + end) + + MockClient + |> stub(:send_message, fn + _, %GetLastTransactionAddress{address: ^addr0}, _ -> + {:ok, %LastTransactionAddress{address: addr4}} + + _, %GetTransaction{address: ^addr4}, _ -> + {:ok, nss_chain.txn4} + + _, %GetTransaction{address: ^addr3}, _ -> + {:ok, nss_chain.txn3} + + _, %GetTransactionInputs{address: ^addr3}, _ -> + {:ok, %TransactionInputList{inputs: []}} + + _, %GetFirstAddress{address: ^addr3}, _ -> + {:ok, %FirstAddress{address: addr0}} + + _, %GetTransactionChain{address: ^addr3, paging_state: ^addr2}, _ -> + {:ok, + %TransactionList{ + transactions: [nss_chain.txn3, nss_chain.txn4], + more?: false, + paging_state: nil + }} + end) + + assert :ok = + Bootstrap.do_resync_network_chain( + [:node_shared_secrets], + _nodes = P2P.authorized_and_available_nodes() + ) + + # flow + # get_gen_addr(:pers_term) -> resolve_last_address -> get_last_address + # | + # validate_and_store_transaction_chain <- fetch_transaction_remotely + # | + # transaction_exists? -> fetch_context(tx) -> get_last_txn (db then -> remote check) + # | + # transaction_exists?(prev_txn\tx3) <- stream_previous_chain <- fetch_inputs_remotely + # | + # stream_transaction_chain(addr3/prev-tx) -> fetch_genesis_address_remotely -> + # | + # &TransactionChain.write/1 <- stream_remotely(addr3,addr2) <- get_last_address(locally) + # | + # write_transaction(tx4) -> ingest txn4 + assert_receive({:write_transaction_chain, txn_list}) + + assert [] == + Enum.reduce(txn_list, [addr4, addr3], fn tx, acc -> + acc -- [tx.address] + end) + end + + defp p2p_context() do + pb_key3 = Crypto.derive_keypair("key33", 0) |> elem(0) + + SharedSecrets.add_origin_public_key(:software, Crypto.first_node_public_key()) + + coordinator_node = %Node{ + first_public_key: Crypto.first_node_public_key(), + last_public_key: Crypto.last_node_public_key(), + authorized?: true, + available?: true, + authorization_date: DateTime.add(DateTime.utc_now(), -86_400, :second), + geo_patch: "AAA", + network_patch: "AAA", + enrollment_date: DateTime.add(DateTime.utc_now(), -86_400, :second), + reward_address: Crypto.derive_address(Crypto.last_node_public_key()) + } + + storage_nodes = [ + %Node{ + ip: {127, 0, 0, 1}, + port: 3000, + first_public_key: pb_key3, + last_public_key: pb_key3, + available?: true, + authorized?: true, + geo_patch: "BBB", + network_patch: "BBB", + authorization_date: DateTime.add(DateTime.utc_now(), -86_400, :second), + reward_address: Crypto.derive_address(pb_key3), + enrollment_date: DateTime.add(DateTime.utc_now(), -86_400, :second) + } + ] + + Enum.each(storage_nodes, &P2P.add_and_connect_node(&1)) + + # P2P.add_and_connect_node(welcome_node) + P2P.add_and_connect_node(coordinator_node) + end + end end diff --git a/test/support/transaction_factory.ex b/test/support/transaction_factory.ex index 9ac7b0414..1174bc4c9 100644 --- a/test/support/transaction_factory.ex +++ b/test/support/transaction_factory.ex @@ -1,11 +1,12 @@ defmodule Archethic.TransactionFactory do @moduledoc false - alias Archethic.Crypto - - alias Archethic.Election - - alias Archethic.Mining.Fee + alias Archethic.{ + Crypto, + Election, + Mining.Fee, + SharedSecrets + } alias Archethic.TransactionChain alias Archethic.TransactionChain.Transaction @@ -208,4 +209,51 @@ defmodule Archethic.TransactionFactory do %{tx | validation_stamp: validation_stamp, cross_validation_stamps: [cross_validation_stamp]} end + + @doc """ + Creates a valid Node Shared Secrets Tx with parameters index, timestamp, prev_txn + """ + @spec create_network_tx(:node_shared_secrets, keyword) :: + Archethic.TransactionChain.Transaction.t() + def create_network_tx(_type = :node_shared_secrets, opts) do + inputs = Keyword.get(opts, :inputs, []) + seed = Keyword.get(opts, :seed, "daily_nonce_seed") + index = Keyword.get(opts, :index) + timestamp = Keyword.get(opts, :timestamp) + aes_key = :crypto.strong_rand_bytes(32) + prev_txn = Keyword.get(opts, :prev_txn, []) + + tx = + SharedSecrets.new_node_shared_secrets_transaction( + [Crypto.last_node_public_key()], + seed, + aes_key, + index + ) + + ledger_operations = + %LedgerOperations{ + fee: Fee.calculate(tx, 0.07), + transaction_movements: Transaction.get_movements(tx) + } + |> LedgerOperations.consume_inputs(tx.address, inputs) + + validation_stamp = + %ValidationStamp{ + timestamp: timestamp, + proof_of_work: Crypto.origin_node_public_key(), + proof_of_election: Election.validation_nodes_election_seed_sorting(tx, timestamp), + proof_of_integrity: TransactionChain.proof_of_integrity([tx | prev_txn]), + ledger_operations: ledger_operations + } + |> ValidationStamp.sign() + + cross_validation_stamp = CrossValidationStamp.sign(%CrossValidationStamp{}, validation_stamp) + + %{ + tx + | validation_stamp: validation_stamp, + cross_validation_stamps: [cross_validation_stamp] + } + end end