Skip to content

Commit

Permalink
improve_synchronization_when_node_ip_changes_#887
Browse files Browse the repository at this point in the history
  • Loading branch information
apoorv-2204 committed Mar 22, 2023
1 parent eb76d8d commit c1ae5be
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 83 deletions.
43 changes: 4 additions & 39 deletions lib/archethic/bootstrap/transaction_handler.ex
Original file line number Diff line number Diff line change
@@ -1,21 +1,9 @@
defmodule Archethic.Bootstrap.TransactionHandler do
@moduledoc false

alias Archethic.Crypto

alias Archethic.Election

alias Archethic.P2P
alias Archethic.P2P.Message.GetTransactionSummary
alias Archethic.P2P.Message.TransactionSummaryMessage
alias Archethic.P2P.Message.NewTransaction
alias Archethic.P2P.Message.NotFound
alias Archethic.P2P.Message.Ok
alias Archethic.P2P.Node

alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.TransactionData
alias Archethic.TransactionChain.TransactionSummary
alias Archethic.{Crypto, Utils, Election, P2P, TransactionChain}
alias P2P.{Node, Message.NewTransaction, Message.Ok}
alias TransactionChain.{Transaction, TransactionData}

require Logger

Expand Down Expand Up @@ -54,7 +42,7 @@ defmodule Archethic.Bootstrap.TransactionHandler do
)
|> Enum.reject(&(&1.first_public_key == Crypto.first_node_public_key()))

case await_confirmation(tx.address, storage_nodes) do
case Utils.await_confirmation(tx.address, storage_nodes) do
:ok ->
:ok

Expand All @@ -73,29 +61,6 @@ defmodule Archethic.Bootstrap.TransactionHandler do

defp do_send_transaction([], _), do: {:error, :network_issue}

@spec await_confirmation(tx_address :: binary(), list(Node.t())) ::
:ok | {:error, :network_issue}
defp await_confirmation(tx_address, [node | rest]) do
case P2P.send_message(node, %GetTransactionSummary{address: tx_address}) do
{:ok,
%TransactionSummaryMessage{transaction_summary: %TransactionSummary{address: ^tx_address}}} ->
:ok

{:ok, %NotFound{}} ->
Process.sleep(200)
await_confirmation(tx_address, [node | rest])

{:error, e} ->
Logger.error("Cannot get transaction summary - #{inspect(e)}",
node: Base.encode16(node.first_public_key)
)

await_confirmation(tx_address, rest)
end
end

defp await_confirmation(_, []), do: {:error, :network_issue}

@doc """
Create a new node transaction
"""
Expand Down
69 changes: 38 additions & 31 deletions lib/archethic/networking/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,13 @@ defmodule Archethic.Networking.Scheduler do
use GenServer
@vsn Mix.Project.config()[:version]

alias Archethic.Crypto
alias Archethic.{Crypto, P2P, P2P.Node, Networking, TaskSupervisor, Utils, PubSub}
alias Archethic.{SelfRepair, TransactionChain}

alias Archethic.P2P
alias Archethic.P2P.Listener, as: P2PListener
alias Archethic.P2P.Node

alias Archethic.Networking.IPLookup
alias Archethic.Networking.PortForwarding

alias Archethic.TaskSupervisor

alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.TransactionData

alias Archethic.Utils

alias Archethic.PubSub
alias Networking.{IPLookup, PortForwarding}
alias TransactionChain.{Transaction, TransactionData}

alias P2P.Listener, as: P2PListener
alias ArchethicWeb.Endpoint, as: WebEndpoint

require Logger
Expand Down Expand Up @@ -101,20 +89,23 @@ defmodule Archethic.Networking.Scheduler do
origin_public_key = Crypto.origin_node_public_key()
key_certificate = Crypto.get_key_certificate(origin_public_key)

Transaction.new(:node, %TransactionData{
code: code,
content:
Node.encode_transaction_content(
ip,
p2p_port,
web_port,
transport,
reward_address,
origin_public_key,
key_certificate
)
})
|> Archethic.send_new_transaction()
tx =
Transaction.new(:node, %TransactionData{
code: code,
content:
Node.encode_transaction_content(
ip,
p2p_port,
web_port,
transport,
reward_address,
origin_public_key,
key_certificate
)
})

Archethic.send_new_transaction(tx)
handle_new_ip(tx)
else
:error ->
Logger.warning("Cannot open port")
Expand All @@ -139,4 +130,20 @@ defmodule Archethic.Networking.Scheduler do
{:ok, p2p_port, web_port}
end
end

defp handle_new_ip(%Transaction{address: tx_address}) do
nodes =
P2P.authorized_and_available_nodes()
|> Enum.filter(&Node.locally_available?/1)
|> P2P.nearest_nodes()

Utils.await_confirmation(tx_address, nodes)

types = [:node, :oracle, :node_shared_secrets, :reward]

Task.Supervisor.async_stream_nolink(Archethic.TaskSupervisor, types, fn type ->
SelfRepair.resync_network_chain(type, nodes)
end)
|> Stream.run()
end
end
2 changes: 1 addition & 1 deletion lib/archethic/self_repair.ex
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ defmodule Archethic.SelfRepair do
:ok
end

@spec resync_network_chain(atom(), list(Node.t()) | []) :: :ok | :error
@spec resync_network_chain(type :: atom(), nodes :: list(Node.t()) | []) :: :ok | :error
def resync_network_chain(_, []),
do: Logger.notice("Enforce Resync of Network Txs: No-Nodes")

Expand Down
50 changes: 38 additions & 12 deletions lib/archethic/utils.ex
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
defmodule Archethic.Utils do
@moduledoc false

alias Crontab.CronExpression.Parser, as: CronParser
alias Crontab.Scheduler, as: CronScheduler

alias Archethic.BeaconChain.ReplicationAttestation

alias Archethic.TransactionChain.TransactionSummary

alias Archethic.Crypto

alias Archethic.P2P.Node
alias Archethic.{BeaconChain, Crypto, P2P, TransactionChain, P2P.Node, P2P.Message}
alias BeaconChain.{ReplicationAttestation}
alias TransactionChain.{Transaction, TransactionData, TransactionSummary}

alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.TransactionData
alias Message.{GetTransactionSummary, TransactionSummaryMessage}

alias Archethic.Reward.Scheduler, as: RewardScheduler
alias Crontab.CronExpression.Parser, as: CronParser
alias Crontab.Scheduler, as: CronScheduler

import Bitwise

require Logger

use Retry

@doc """
Compute an offset of the next shift in seconds for a given time interval
Expand Down Expand Up @@ -997,4 +995,32 @@ defmodule Archethic.Utils do
:crypto.hash(:sha256, data_to_digest)
|> Base.encode16()
end

@spec await_confirmation(tx_address :: binary(), list(Node.t())) ::
:ok | {:error, :network_issue}
def await_confirmation(tx_address, nodes) do
acceptance_resolver = fn
{:ok,
%TransactionSummaryMessage{transaction_summary: %TransactionSummary{address: ^tx_address}}} ->
true

_ ->
false
end

# at 1th , 2th , 4th , 8th , 16th , 32th second
retry_while with: exponential_backoff(1000, 2) |> expiry(70_000) do
case P2P.quorum_read(
nodes,
%GetTransactionSummary{address: tx_address},
acceptance_resolver
) do
{:ok, _} ->
{:halt, :ok}

_ ->
{:cont, {:error, :network_issue}}
end
end
end
end

0 comments on commit c1ae5be

Please sign in to comment.