From 1c03c756558ddeb59f1bf38959f3fb9d74811b71 Mon Sep 17 00:00:00 2001 From: apoorv-2204 Date: Tue, 7 Feb 2023 17:38:12 +0530 Subject: [PATCH] Move forced resync function to self_repair --- lib/archethic/bootstrap.ex | 67 ++---------------- lib/archethic/crypto.ex | 8 +++ lib/archethic/self_repair/resync_worker.ex | 82 ++++++++++++++++++++++ 3 files changed, 97 insertions(+), 60 deletions(-) create mode 100644 lib/archethic/self_repair/resync_worker.ex diff --git a/lib/archethic/bootstrap.ex b/lib/archethic/bootstrap.ex index 7eb13ba096..93fcbad7f5 100644 --- a/lib/archethic/bootstrap.ex +++ b/lib/archethic/bootstrap.ex @@ -3,22 +3,11 @@ defmodule Archethic.Bootstrap do Manage Archethic Node Bootstrapping """ - alias Archethic.Bootstrap.{ - NetworkInit, - Sync, - TransactionHandler - } - - alias Archethic.{ - Crypto, - Networking, - P2P, - P2P.Node, - P2P.Listener, - SelfRepair, - TransactionChain, - Replication - } + alias Archethic.{Bootstrap, Crypto, Networking, P2P, P2P.Node, P2P.Listener} + + alias Archethic.{SelfRepair, SelfRepair.ResyncWorker, TransactionChain} + + alias Bootstrap.{NetworkInit, Sync, TransactionHandler} require Logger @@ -234,54 +223,12 @@ defmodule Archethic.Bootstrap do &(&1.first_public_key == Crypto.first_node_public_key()) ) + # blocking code [:oracle, :node_shared_secrets, :reward] - |> Enum.each(&do_resync_network_chain(&1, nodes)) - end - end - - @spec do_resync_network_chain(list(atom), list(P2P.Node.t()) | []) :: :ok - def do_resync_network_chain(_type_list, _nodes = []), - do: Logger.info("Enforce Reync of Network Txs: failure, No-Nodes") - - # by type: Get gen addr, get last address (remotely & locally) - # compare, if dont match, fetch last tx remotely - def do_resync_network_chain(type, nodes) when is_list(nodes) do - with addr when is_binary(addr) <- get_genesis_addr(type), - {:ok, rem_last_addr} <- TransactionChain.resolve_last_address(addr), - {local_last_addr, _} <- TransactionChain.get_last_address(addr), - false <- rem_last_addr == local_last_addr, - {:ok, tx} <- TransactionChain.fetch_transaction_remotely(rem_last_addr, nodes), - :ok <- Replication.validate_and_store_transaction_chain(tx) do - Logger.info("Enforced Resync: Success", transaction_type: type) - :ok - else - true -> - Logger.info("Enforced Resync: No new transaction to sync", transaction_type: type) - :ok - - e when e in [nil, []] -> - Logger.debug("Enforced Resync: Transaction not available", transaction_type: type) - :ok - - e -> - Logger.debug("Enforced Resync: Unexpected Error", transaction_type: type) - Logger.debug(e) + |> Enum.each(&ResyncWorker.resync(&1, nodes)) end end - @spec get_genesis_addr(:node_shared_secrets | :oracle | :reward) :: binary() | nil - defp get_genesis_addr(:oracle) do - Archethic.OracleChain.genesis_address().current |> elem(0) - end - - defp get_genesis_addr(:node_shared_secrets) do - Archethic.SharedSecrets.genesis_address(:node_shared_secrets) - end - - defp get_genesis_addr(:reward) do - Archethic.Reward.genesis_address() - end - defp first_initialization( ip, port, diff --git a/lib/archethic/crypto.ex b/lib/archethic/crypto.ex index 4890dd127d..318c477bca 100755 --- a/lib/archethic/crypto.ex +++ b/lib/archethic/crypto.ex @@ -1320,4 +1320,12 @@ defmodule Archethic.Crypto do def list_supported_hash_functions(), do: @supported_hashes @string_hashes Enum.map(@supported_hashes, &Atom.to_string/1) def list_supported_hash_functions(:string), do: @string_hashes + + @spec genesis_address(:node_shared_secrets | :oracle | :reward) :: binary() | nil + def genesis_address(:oracle), do: Archethic.OracleChain.genesis_address().current |> elem(0) + + def genesis_address(:node_shared_secrets), + do: Archethic.SharedSecrets.genesis_address(:node_shared_secrets) + + def genesis_address(:reward), do: Archethic.Reward.genesis_address() end diff --git a/lib/archethic/self_repair/resync_worker.ex b/lib/archethic/self_repair/resync_worker.ex new file mode 100644 index 0000000000..d2f7bbb0d1 --- /dev/null +++ b/lib/archethic/self_repair/resync_worker.ex @@ -0,0 +1,82 @@ +defmodule Archethic.SelfRepair.ResyncWorker do + @moduledoc """ + A Long Running generver impl, handles a const req of network resync. + """ + @vsn Mix.Project.config()[:version] + + alias Archethic.{Crypto, P2P, TransactionChain, Replication} + + use GenServer, restart: :permanent + require Logger + + # ------------------------------------------- + # API + # ------------------------------------------- + def sync_again(name \\ __MODULE__) do + GenServer.cast(name, :resync_request) + end + + # ------------------------------------------- + # CALLBACKS + # ------------------------------------------- + + def start_link(args \\ [], opts \\ [name: __MODULE__]) do + GenServer.start_link(__MODULE__, args, opts) + end + + def init(_args) do + Logger.notice("[#{inspect(__MODULE__)}] Started") + {:ok, %{scheduled: false}} + end + + def handle_cast(:resync_request, server_data = %{scheduled: true}) do + {:noreply, server_data} + end + + def handle_cast(:resync_request, %{scheduled: false}) do + {:noreply, schedule()} + end + + def handle_info({:DOWN, _ref, :process, pid, _normal}, %{task_pid: task_pid}) + when pid == task_pid do + {:noreply, %{scheduled: false}} + end + + def handle_info(_, data), do: {:noreply, data} + + # ------------------------------------------- + # LOGIC IMPL + # ------------------------------------------- + + defp schedule() do + %Task{pid: pid} = Task.async(fn -> resync(:node_shared_secrets, P2P.exclusive_nodes()) end) + %{task_pid: pid, scheduled: true} + end + + @spec resync(atom(), list(P2P.Node.t()) | []) :: :ok | :error + def resync(_, []), + do: Logger.error("Enforce Resync of Network Txs: No-Nodes") + + def resync(type, nodes) do + with addr when is_binary(addr) <- Crypto.genesis_address(type), + {:ok, rem_last_addr} <- TransactionChain.resolve_last_address(addr), + {local_last_addr, _} <- TransactionChain.get_last_address(addr), + false <- rem_last_addr == local_last_addr, + {:ok, tx} <- TransactionChain.fetch_transaction_remotely(rem_last_addr, nodes), + :ok <- Replication.validate_and_store_transaction_chain(tx) do + Logger.info("Enforced Resync: Success", transaction_type: type) + :ok + else + true -> + Logger.info("Enforced Resync: No new transaction to sync", transaction_type: type) + :ok + + e when e in [nil, []] -> + Logger.debug("Enforced Resync: Transaction not available", transaction_type: type) + :ok + + e -> + Logger.debug("Enforced Resync: Error #{inspect(e)}", transaction_type: type) + end + end +end