Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve_synchronization_when_node_ip_changes_#887 #930

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 4 additions & 39 deletions lib/archethic/bootstrap/transaction_handler.ex
Original file line number Diff line number Diff line change
@@ -1,21 +1,9 @@
defmodule Archethic.Bootstrap.TransactionHandler do
@moduledoc false

alias Archethic.Crypto

alias Archethic.Election

alias Archethic.P2P
alias Archethic.P2P.Message.GetTransactionSummary
alias Archethic.P2P.Message.TransactionSummaryMessage
alias Archethic.P2P.Message.NewTransaction
alias Archethic.P2P.Message.NotFound
alias Archethic.P2P.Message.Ok
alias Archethic.P2P.Node

alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.TransactionData
alias Archethic.TransactionChain.TransactionSummary
alias Archethic.{Crypto, Utils, Election, P2P, TransactionChain}
alias P2P.{Node, Message.NewTransaction, Message.Ok}
alias TransactionChain.{Transaction, TransactionData}

require Logger

Expand Down Expand Up @@ -54,7 +42,7 @@ defmodule Archethic.Bootstrap.TransactionHandler do
)
|> Enum.reject(&(&1.first_public_key == Crypto.first_node_public_key()))

case await_confirmation(tx.address, storage_nodes) do
case Utils.await_confirmation(tx.address, storage_nodes) do
:ok ->
:ok

Expand All @@ -73,29 +61,6 @@ defmodule Archethic.Bootstrap.TransactionHandler do

defp do_send_transaction([], _), do: {:error, :network_issue}

@spec await_confirmation(tx_address :: binary(), list(Node.t())) ::
:ok | {:error, :network_issue}
defp await_confirmation(tx_address, [node | rest]) do
case P2P.send_message(node, %GetTransactionSummary{address: tx_address}) do
{:ok,
%TransactionSummaryMessage{transaction_summary: %TransactionSummary{address: ^tx_address}}} ->
:ok

{:ok, %NotFound{}} ->
Process.sleep(200)
await_confirmation(tx_address, [node | rest])

{:error, e} ->
Logger.error("Cannot get transaction summary - #{inspect(e)}",
node: Base.encode16(node.first_public_key)
)

await_confirmation(tx_address, rest)
end
end

defp await_confirmation(_, []), do: {:error, :network_issue}

@doc """
Create a new node transaction
"""
Expand Down
69 changes: 38 additions & 31 deletions lib/archethic/networking/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,13 @@ defmodule Archethic.Networking.Scheduler do
use GenServer
@vsn Mix.Project.config()[:version]

alias Archethic.Crypto
alias Archethic.{Crypto, P2P, P2P.Node, Networking, TaskSupervisor, Utils, PubSub}
alias Archethic.{SelfRepair, TransactionChain}

alias Archethic.P2P
alias Archethic.P2P.Listener, as: P2PListener
alias Archethic.P2P.Node

alias Archethic.Networking.IPLookup
alias Archethic.Networking.PortForwarding

alias Archethic.TaskSupervisor

alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.TransactionData

alias Archethic.Utils

alias Archethic.PubSub
alias Networking.{IPLookup, PortForwarding}
alias TransactionChain.{Transaction, TransactionData}

alias P2P.Listener, as: P2PListener
alias ArchethicWeb.Endpoint, as: WebEndpoint

require Logger
Expand Down Expand Up @@ -101,20 +89,23 @@ defmodule Archethic.Networking.Scheduler do
origin_public_key = Crypto.origin_node_public_key()
key_certificate = Crypto.get_key_certificate(origin_public_key)

Transaction.new(:node, %TransactionData{
code: code,
content:
Node.encode_transaction_content(
ip,
p2p_port,
web_port,
transport,
reward_address,
origin_public_key,
key_certificate
)
})
|> Archethic.send_new_transaction()
tx =
Transaction.new(:node, %TransactionData{
code: code,
content:
Node.encode_transaction_content(
ip,
p2p_port,
web_port,
transport,
reward_address,
origin_public_key,
key_certificate
)
})

Archethic.send_new_transaction(tx)
handle_new_ip(tx)
else
:error ->
Logger.warning("Cannot open port")
Expand All @@ -139,4 +130,20 @@ defmodule Archethic.Networking.Scheduler do
{:ok, p2p_port, web_port}
end
end

defp handle_new_ip(%Transaction{address: tx_address}) do
nodes =
P2P.authorized_and_available_nodes()
|> Enum.filter(&Node.locally_available?/1)
samuelmanzanera marked this conversation as resolved.
Show resolved Hide resolved
|> P2P.nearest_nodes()

Utils.await_confirmation(tx_address, nodes)

types = [:node, :oracle, :node_shared_secrets, :reward]

Task.Supervisor.async_stream_nolink(Archethic.TaskSupervisor, types, fn type ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After #933 merge, you'd need to use NetworkChain.asynchronous_resync_many(network_chain_types) instead.

SelfRepair.resync_network_chain(type, nodes)
end)
|> Stream.run()
end
end
2 changes: 1 addition & 1 deletion lib/archethic/self_repair.ex
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ defmodule Archethic.SelfRepair do
:ok
end

@spec resync_network_chain(atom(), list(Node.t()) | []) :: :ok | :error
@spec resync_network_chain(type :: atom(), nodes :: list(Node.t()) | []) :: :ok | :error
def resync_network_chain(_, []),
do: Logger.notice("Enforce Resync of Network Txs: No-Nodes")

Expand Down
50 changes: 38 additions & 12 deletions lib/archethic/utils.ex
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
defmodule Archethic.Utils do
@moduledoc false

alias Crontab.CronExpression.Parser, as: CronParser
alias Crontab.Scheduler, as: CronScheduler

alias Archethic.BeaconChain.ReplicationAttestation

alias Archethic.TransactionChain.TransactionSummary

alias Archethic.Crypto

alias Archethic.P2P.Node
alias Archethic.{BeaconChain, Crypto, P2P, TransactionChain, P2P.Node, P2P.Message}
alias BeaconChain.{ReplicationAttestation}
alias TransactionChain.{Transaction, TransactionData, TransactionSummary}

alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.TransactionData
alias Message.{GetTransactionSummary, TransactionSummaryMessage}

alias Archethic.Reward.Scheduler, as: RewardScheduler
alias Crontab.CronExpression.Parser, as: CronParser
alias Crontab.Scheduler, as: CronScheduler

import Bitwise

require Logger

use Retry

@doc """
Compute an offset of the next shift in seconds for a given time interval

Expand Down Expand Up @@ -997,4 +995,32 @@ defmodule Archethic.Utils do
:crypto.hash(:sha256, data_to_digest)
|> Base.encode16()
end

@spec await_confirmation(tx_address :: binary(), list(Node.t())) ::
:ok | {:error, :network_issue}
def await_confirmation(tx_address, nodes) do
acceptance_resolver = fn
{:ok,
%TransactionSummaryMessage{transaction_summary: %TransactionSummary{address: ^tx_address}}} ->
true

_ ->
false
end

# at 1th , 2th , 4th , 8th , 16th , 32th second
retry_while with: exponential_backoff(1000, 2) |> expiry(70_000) do
case P2P.quorum_read(
nodes,
%GetTransactionSummary{address: tx_address},
acceptance_resolver
) do
{:ok, _} ->
{:halt, :ok}

_ ->
{:cont, {:error, :network_issue}}
end
end
end
end