From 5e40c86495c80a88508897cd0d5e42ec5e69d3de Mon Sep 17 00:00:00 2001 From: Apoorv Date: Thu, 25 Aug 2022 23:07:07 +0530 Subject: [PATCH] requested changes --- lib/archethic/bootstrap.ex | 72 +++++++++++++++++++++++++++++++ test/archethic/bootstrap_test.exs | 71 ++++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+) diff --git a/lib/archethic/bootstrap.ex b/lib/archethic/bootstrap.ex index f724835b8c..5ac46a651e 100644 --- a/lib/archethic/bootstrap.ex +++ b/lib/archethic/bootstrap.ex @@ -16,6 +16,8 @@ defmodule Archethic.Bootstrap do alias Archethic.P2P.Listener alias Archethic.SelfRepair + alias Archethic.TransactionChain + alias Archethic.Replication require Logger @@ -207,6 +209,8 @@ defmodule Archethic.Bootstrap do Logger.info("Synchronization finished") end + resync_network_chain([:oracle, :node_shared_secrets], P2P.authorized_and_available_nodes()) + Sync.publish_end_of_sync() SelfRepair.start_scheduler() @@ -214,6 +218,74 @@ defmodule Archethic.Bootstrap do Listener.listen() end + # when no node loaded in memtable + @spec resync_network_chain(any, maybe_improper_list) :: :ok + def resync_network_chain(_type_list, _nodes = []), + do: Logger.info("Enforced Resync of Network Txs: failure, No-Nodes") + + def resync_network_chain(type_list, nodes) when is_list(nodes) do + # spawn process for each type of txn to sync + Task.Supervisor.async_stream_nolink(Archethic.TaskSupervisor, type_list, fn type -> + # acquire a(any) address of type txn, (for querying DB) + case TransactionChain.list_addresses_by_type(type) |> Stream.take(1) |> Enum.at(0) do + nil -> + Logger.info("Enforced Resync: failure ", transaction_type: type) + Logger.debug("Transaction address: nil") + :ok + + [] -> + # no txn avilable for that type txn + # can fetch that "type" network chain + Logger.info("Enforced Resync: failure ", transaction_type: type) + Logger.debug("Transaction address: nil") + :ok + + addr -> + # get last txn locally available + {last_addr, _dt} = TransactionChain.get_last_address(addr) + + try do + # fetch txs remotely post last_addr + TransactionChain.stream_remotely(last_addr, nodes, last_addr) + |> Enum.to_list() + |> List.flatten() + |> Enum.each(fn tx = %TransactionChain.Transaction{} -> + if TransactionChain.transaction_exists?(tx.address) do + # tx already exists + :ok + else + case Replication.validate_and_store_transaction(tx) do + :ok -> + Logger.debug( + "Enforced Resync: #{tx.address |> Base.encode16()} status: :stored", + transaction_type: type + ) + + :ok + + {:error, e} -> + Logger.debug( + "Enforced Resync: #{tx.address |> Base.encode16()} status: :error_during_replication", + transaction_type: type + ) + + Logger.debug(e) + end + end + end) + + Logger.info("Enforced Resync: Success", transaction_type: type) + rescue + e -> + Logger.debug("Enforced Resync: Unexpected Error", transaction_type: type) + Logger.debug(e) + :ok + end + end + end) + |> Stream.run() + end + defp first_initialization( ip, port, diff --git a/test/archethic/bootstrap_test.exs b/test/archethic/bootstrap_test.exs index 6eff8e8c92..48773f0667 100644 --- a/test/archethic/bootstrap_test.exs +++ b/test/archethic/bootstrap_test.exs @@ -431,4 +431,75 @@ defmodule Archethic.BootstrapTest do Process.sleep(100) end end + + describe "resync_network_chain/2" do + test "should Retrieve and Store Network tx's" do + pbk = + Crypto.derive_keypair("node_for_bootstrap1", 0) + |> elem(0) + + P2P.add_and_connect_node(%Node{ + ip: {80, 10, 20, 102}, + port: 3002, + http_port: 4000, + last_public_key: pbk, + first_public_key: pbk, + network_patch: "AAA", + geo_patch: "AAA", + available?: true, + enrollment_date: DateTime.utc_now(), + reward_address: pbk |> Crypto.derive_address(), + authorized?: true, + authorization_date: DateTime.utc_now() |> DateTime.add(-86_400), + synced?: true + }) + + addr = Crypto.derive_keypair("nss_test", 1) |> elem(0) |> Crypto.derive_address() + + MockDB + |> expect(:list_addresses_by_type, 1, fn :node_shared_secrets -> [addr] end) + |> expect(:get_last_chain_address, 1, fn addr -> {addr, DateTime.utc_now()} end) + |> stub(:transaction_exists?, fn _ -> false end) + + MockClient + |> stub(:send_message, fn + _, %GetTransactionChainLength{}, _ -> + %TransactionChainLength{length: 4} + + _, %GetTransactionChain{}, _ -> + {:ok, + %TransactionList{ + transactions: [ + %Transaction{ + type: :node_shared_secrets, + address: + Crypto.derive_keypair("nss_test", 2) + |> elem(0) + |> Crypto.derive_address() + }, + %Transaction{ + type: :node_shared_secrets, + address: + Crypto.derive_keypair("nss_test", 3) + |> elem(0) + |> Crypto.derive_address() + }, + %Transaction{ + type: :node_shared_secrets, + address: + Crypto.derive_keypair("nss_test", 4) + |> elem(0) + |> Crypto.derive_address() + } + ] + }} + end) + + :ok = + Bootstrap.resync_network_chain( + [:node_shared_secrets], + _nodes = P2P.authorized_and_available_nodes() + ) + end + end end