From c1ae5bee3ac3477981905d697477deee57a520c8 Mon Sep 17 00:00:00 2001 From: apoorv-2204 Date: Wed, 22 Mar 2023 19:50:19 +0530 Subject: [PATCH] improve_synchronization_when_node_ip_changes_#887 --- .../bootstrap/transaction_handler.ex | 43 ++---------- lib/archethic/networking/scheduler.ex | 69 ++++++++++--------- lib/archethic/self_repair.ex | 2 +- lib/archethic/utils.ex | 50 ++++++++++---- 4 files changed, 81 insertions(+), 83 deletions(-) diff --git a/lib/archethic/bootstrap/transaction_handler.ex b/lib/archethic/bootstrap/transaction_handler.ex index 00baa5083..e341900f5 100644 --- a/lib/archethic/bootstrap/transaction_handler.ex +++ b/lib/archethic/bootstrap/transaction_handler.ex @@ -1,21 +1,9 @@ defmodule Archethic.Bootstrap.TransactionHandler do @moduledoc false - alias Archethic.Crypto - - alias Archethic.Election - - alias Archethic.P2P - alias Archethic.P2P.Message.GetTransactionSummary - alias Archethic.P2P.Message.TransactionSummaryMessage - alias Archethic.P2P.Message.NewTransaction - alias Archethic.P2P.Message.NotFound - alias Archethic.P2P.Message.Ok - alias Archethic.P2P.Node - - alias Archethic.TransactionChain.Transaction - alias Archethic.TransactionChain.TransactionData - alias Archethic.TransactionChain.TransactionSummary + alias Archethic.{Crypto, Utils, Election, P2P, TransactionChain} + alias P2P.{Node, Message.NewTransaction, Message.Ok} + alias TransactionChain.{Transaction, TransactionData} require Logger @@ -54,7 +42,7 @@ defmodule Archethic.Bootstrap.TransactionHandler do ) |> Enum.reject(&(&1.first_public_key == Crypto.first_node_public_key())) - case await_confirmation(tx.address, storage_nodes) do + case Utils.await_confirmation(tx.address, storage_nodes) do :ok -> :ok @@ -73,29 +61,6 @@ defmodule Archethic.Bootstrap.TransactionHandler do defp do_send_transaction([], _), do: {:error, :network_issue} - @spec await_confirmation(tx_address :: binary(), list(Node.t())) :: - :ok | {:error, :network_issue} - defp await_confirmation(tx_address, [node | rest]) do - case P2P.send_message(node, %GetTransactionSummary{address: tx_address}) do - {:ok, - %TransactionSummaryMessage{transaction_summary: %TransactionSummary{address: ^tx_address}}} -> - :ok - - {:ok, %NotFound{}} -> - Process.sleep(200) - await_confirmation(tx_address, [node | rest]) - - {:error, e} -> - Logger.error("Cannot get transaction summary - #{inspect(e)}", - node: Base.encode16(node.first_public_key) - ) - - await_confirmation(tx_address, rest) - end - end - - defp await_confirmation(_, []), do: {:error, :network_issue} - @doc """ Create a new node transaction """ diff --git a/lib/archethic/networking/scheduler.ex b/lib/archethic/networking/scheduler.ex index afdd76d8a..7437e0f88 100644 --- a/lib/archethic/networking/scheduler.ex +++ b/lib/archethic/networking/scheduler.ex @@ -4,25 +4,13 @@ defmodule Archethic.Networking.Scheduler do use GenServer @vsn Mix.Project.config()[:version] - alias Archethic.Crypto + alias Archethic.{Crypto, P2P, P2P.Node, Networking, TaskSupervisor, Utils, PubSub} + alias Archethic.{SelfRepair, TransactionChain} - alias Archethic.P2P - alias Archethic.P2P.Listener, as: P2PListener - alias Archethic.P2P.Node - - alias Archethic.Networking.IPLookup - alias Archethic.Networking.PortForwarding - - alias Archethic.TaskSupervisor - - alias Archethic.TransactionChain - alias Archethic.TransactionChain.Transaction - alias Archethic.TransactionChain.TransactionData - - alias Archethic.Utils - - alias Archethic.PubSub + alias Networking.{IPLookup, PortForwarding} + alias TransactionChain.{Transaction, TransactionData} + alias P2P.Listener, as: P2PListener alias ArchethicWeb.Endpoint, as: WebEndpoint require Logger @@ -101,20 +89,23 @@ defmodule Archethic.Networking.Scheduler do origin_public_key = Crypto.origin_node_public_key() key_certificate = Crypto.get_key_certificate(origin_public_key) - Transaction.new(:node, %TransactionData{ - code: code, - content: - Node.encode_transaction_content( - ip, - p2p_port, - web_port, - transport, - reward_address, - origin_public_key, - key_certificate - ) - }) - |> Archethic.send_new_transaction() + tx = + Transaction.new(:node, %TransactionData{ + code: code, + content: + Node.encode_transaction_content( + ip, + p2p_port, + web_port, + transport, + reward_address, + origin_public_key, + key_certificate + ) + }) + + Archethic.send_new_transaction(tx) + handle_new_ip(tx) else :error -> Logger.warning("Cannot open port") @@ -139,4 +130,20 @@ defmodule Archethic.Networking.Scheduler do {:ok, p2p_port, web_port} end end + + defp handle_new_ip(%Transaction{address: tx_address}) do + nodes = + P2P.authorized_and_available_nodes() + |> Enum.filter(&Node.locally_available?/1) + |> P2P.nearest_nodes() + + Utils.await_confirmation(tx_address, nodes) + + types = [:node, :oracle, :node_shared_secrets, :reward] + + Task.Supervisor.async_stream_nolink(Archethic.TaskSupervisor, types, fn type -> + SelfRepair.resync_network_chain(type, nodes) + end) + |> Stream.run() + end end diff --git a/lib/archethic/self_repair.ex b/lib/archethic/self_repair.ex index db83a3107..8ecb11861 100755 --- a/lib/archethic/self_repair.ex +++ b/lib/archethic/self_repair.ex @@ -224,7 +224,7 @@ defmodule Archethic.SelfRepair do :ok end - @spec resync_network_chain(atom(), list(Node.t()) | []) :: :ok | :error + @spec resync_network_chain(type :: atom(), nodes :: list(Node.t()) | []) :: :ok | :error def resync_network_chain(_, []), do: Logger.notice("Enforce Resync of Network Txs: No-Nodes") diff --git a/lib/archethic/utils.ex b/lib/archethic/utils.ex index 001cae540..02ceeee85 100644 --- a/lib/archethic/utils.ex +++ b/lib/archethic/utils.ex @@ -1,24 +1,22 @@ defmodule Archethic.Utils do @moduledoc false - alias Crontab.CronExpression.Parser, as: CronParser - alias Crontab.Scheduler, as: CronScheduler - - alias Archethic.BeaconChain.ReplicationAttestation - - alias Archethic.TransactionChain.TransactionSummary - - alias Archethic.Crypto - - alias Archethic.P2P.Node + alias Archethic.{BeaconChain, Crypto, P2P, TransactionChain, P2P.Node, P2P.Message} + alias BeaconChain.{ReplicationAttestation} + alias TransactionChain.{Transaction, TransactionData, TransactionSummary} - alias Archethic.TransactionChain.Transaction - alias Archethic.TransactionChain.TransactionData + alias Message.{GetTransactionSummary, TransactionSummaryMessage} alias Archethic.Reward.Scheduler, as: RewardScheduler + alias Crontab.CronExpression.Parser, as: CronParser + alias Crontab.Scheduler, as: CronScheduler import Bitwise + require Logger + + use Retry + @doc """ Compute an offset of the next shift in seconds for a given time interval @@ -997,4 +995,32 @@ defmodule Archethic.Utils do :crypto.hash(:sha256, data_to_digest) |> Base.encode16() end + + @spec await_confirmation(tx_address :: binary(), list(Node.t())) :: + :ok | {:error, :network_issue} + def await_confirmation(tx_address, nodes) do + acceptance_resolver = fn + {:ok, + %TransactionSummaryMessage{transaction_summary: %TransactionSummary{address: ^tx_address}}} -> + true + + _ -> + false + end + + # at 1th , 2th , 4th , 8th , 16th , 32th second + retry_while with: exponential_backoff(1000, 2) |> expiry(70_000) do + case P2P.quorum_read( + nodes, + %GetTransactionSummary{address: tx_address}, + acceptance_resolver + ) do + {:ok, _} -> + {:halt, :ok} + + _ -> + {:cont, {:error, :network_issue}} + end + end + end end