Skip to content

Commit

Permalink
Move forced resync function to self_repair
Browse files Browse the repository at this point in the history
  • Loading branch information
apoorv-2204 committed Feb 7, 2023
1 parent a9a40c5 commit 1c03c75
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 60 deletions.
67 changes: 7 additions & 60 deletions lib/archethic/bootstrap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,11 @@ defmodule Archethic.Bootstrap do
Manage Archethic Node Bootstrapping
"""

alias Archethic.Bootstrap.{
NetworkInit,
Sync,
TransactionHandler
}

alias Archethic.{
Crypto,
Networking,
P2P,
P2P.Node,
P2P.Listener,
SelfRepair,
TransactionChain,
Replication
}
alias Archethic.{Bootstrap, Crypto, Networking, P2P, P2P.Node, P2P.Listener}

alias Archethic.{SelfRepair, SelfRepair.ResyncWorker, TransactionChain}

alias Bootstrap.{NetworkInit, Sync, TransactionHandler}

require Logger

Expand Down Expand Up @@ -234,54 +223,12 @@ defmodule Archethic.Bootstrap do
&(&1.first_public_key == Crypto.first_node_public_key())
)

# blocking code
[:oracle, :node_shared_secrets, :reward]
|> Enum.each(&do_resync_network_chain(&1, nodes))
end
end

@spec do_resync_network_chain(list(atom), list(P2P.Node.t()) | []) :: :ok
def do_resync_network_chain(_type_list, _nodes = []),
do: Logger.info("Enforce Reync of Network Txs: failure, No-Nodes")

# by type: Get gen addr, get last address (remotely & locally)
# compare, if dont match, fetch last tx remotely
def do_resync_network_chain(type, nodes) when is_list(nodes) do
with addr when is_binary(addr) <- get_genesis_addr(type),
{:ok, rem_last_addr} <- TransactionChain.resolve_last_address(addr),
{local_last_addr, _} <- TransactionChain.get_last_address(addr),
false <- rem_last_addr == local_last_addr,
{:ok, tx} <- TransactionChain.fetch_transaction_remotely(rem_last_addr, nodes),
:ok <- Replication.validate_and_store_transaction_chain(tx) do
Logger.info("Enforced Resync: Success", transaction_type: type)
:ok
else
true ->
Logger.info("Enforced Resync: No new transaction to sync", transaction_type: type)
:ok

e when e in [nil, []] ->
Logger.debug("Enforced Resync: Transaction not available", transaction_type: type)
:ok

e ->
Logger.debug("Enforced Resync: Unexpected Error", transaction_type: type)
Logger.debug(e)
|> Enum.each(&ResyncWorker.resync(&1, nodes))
end
end

@spec get_genesis_addr(:node_shared_secrets | :oracle | :reward) :: binary() | nil
defp get_genesis_addr(:oracle) do
Archethic.OracleChain.genesis_address().current |> elem(0)
end

defp get_genesis_addr(:node_shared_secrets) do
Archethic.SharedSecrets.genesis_address(:node_shared_secrets)
end

defp get_genesis_addr(:reward) do
Archethic.Reward.genesis_address()
end

defp first_initialization(
ip,
port,
Expand Down
8 changes: 8 additions & 0 deletions lib/archethic/crypto.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1320,4 +1320,12 @@ defmodule Archethic.Crypto do
def list_supported_hash_functions(), do: @supported_hashes
@string_hashes Enum.map(@supported_hashes, &Atom.to_string/1)
def list_supported_hash_functions(:string), do: @string_hashes

@spec genesis_address(:node_shared_secrets | :oracle | :reward) :: binary() | nil
def genesis_address(:oracle), do: Archethic.OracleChain.genesis_address().current |> elem(0)

def genesis_address(:node_shared_secrets),
do: Archethic.SharedSecrets.genesis_address(:node_shared_secrets)

def genesis_address(:reward), do: Archethic.Reward.genesis_address()
end
82 changes: 82 additions & 0 deletions lib/archethic/self_repair/resync_worker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
defmodule Archethic.SelfRepair.ResyncWorker do
@moduledoc """
A Long Running generver impl, handles a const req of network resync.
"""
@vsn Mix.Project.config()[:version]

alias Archethic.{Crypto, P2P, TransactionChain, Replication}

use GenServer, restart: :permanent
require Logger

# -------------------------------------------
# API
# -------------------------------------------
def sync_again(name \\ __MODULE__) do
GenServer.cast(name, :resync_request)
end

# -------------------------------------------
# CALLBACKS
# -------------------------------------------

def start_link(args \\ [], opts \\ [name: __MODULE__]) do
GenServer.start_link(__MODULE__, args, opts)
end

def init(_args) do
Logger.notice("[#{inspect(__MODULE__)}] Started")
{:ok, %{scheduled: false}}
end

def handle_cast(:resync_request, server_data = %{scheduled: true}) do
{:noreply, server_data}
end

def handle_cast(:resync_request, %{scheduled: false}) do
{:noreply, schedule()}
end

def handle_info({:DOWN, _ref, :process, pid, _normal}, %{task_pid: task_pid})
when pid == task_pid do
{:noreply, %{scheduled: false}}
end

def handle_info(_, data), do: {:noreply, data}

# -------------------------------------------
# LOGIC IMPL
# -------------------------------------------

defp schedule() do
%Task{pid: pid} = Task.async(fn -> resync(:node_shared_secrets, P2P.exclusive_nodes()) end)
%{task_pid: pid, scheduled: true}
end

@spec resync(atom(), list(P2P.Node.t()) | []) :: :ok | :error
def resync(_, []),
do: Logger.error("Enforce Resync of Network Txs: No-Nodes")

def resync(type, nodes) do
with addr when is_binary(addr) <- Crypto.genesis_address(type),
{:ok, rem_last_addr} <- TransactionChain.resolve_last_address(addr),
{local_last_addr, _} <- TransactionChain.get_last_address(addr),
false <- rem_last_addr == local_last_addr,
{:ok, tx} <- TransactionChain.fetch_transaction_remotely(rem_last_addr, nodes),
:ok <- Replication.validate_and_store_transaction_chain(tx) do
Logger.info("Enforced Resync: Success", transaction_type: type)
:ok
else
true ->
Logger.info("Enforced Resync: No new transaction to sync", transaction_type: type)
:ok

e when e in [nil, []] ->
Logger.debug("Enforced Resync: Transaction not available", transaction_type: type)
:ok

e ->
Logger.debug("Enforced Resync: Error #{inspect(e)}", transaction_type: type)
end
end
end

0 comments on commit 1c03c75

Please sign in to comment.