Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"Rolling" SelfRepair #933

Merged
merged 46 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
0079639
NetworkChainView implementation
bchamagne Mar 8, 2023
6c66bed
lint
bchamagne Mar 8, 2023
f11ac1b
send_new_transaction check nodes responses
bchamagne Mar 9, 2023
c37d3bc
Remove join before the Crypto.hash
bchamagne Mar 15, 2023
4554574
cache the network chain hash
bchamagne Mar 15, 2023
6da2d6f
linting the send_new_transaction function
bchamagne Mar 15, 2023
cb8a024
remove useless spawn
bchamagne Mar 15, 2023
0d32d95
use Task instead of spawn
bchamagne Mar 15, 2023
a8f6793
lint
bchamagne Mar 20, 2023
c829d2f
adapt load_node_list() call
bchamagne Mar 21, 2023
a2979a4
use :crypto.hash instead of Archethic.Crypto
bchamagne Mar 22, 2023
116735e
cache the p2p_hash in the network view
bchamagne Mar 22, 2023
b749788
resync_network_chain(s) now use the RepairWorker
bchamagne Mar 22, 2023
86ab7cb
remove useless task
bchamagne Mar 22, 2023
5ba5655
add few comments
bchamagne Mar 22, 2023
e9ed527
lint for coherence
bchamagne Mar 22, 2023
abbe546
synchronize the node transactions in resync_p2p, add a timeout to the…
bchamagne Mar 23, 2023
db715f8
store the last address per origin family
bchamagne Mar 23, 2023
697fc67
add an FSM to resync network chain
bchamagne Mar 24, 2023
bf6a063
add tests for the network chain worker
bchamagne Mar 24, 2023
706d93d
add a :async flag to the resync functions + bootstrap wait for resync…
bchamagne Mar 24, 2023
13e0a34
add a test for the concurrent runs
bchamagne Mar 24, 2023
d01af50
move resync functions in networkchain module and explicit rename
bchamagne Mar 27, 2023
1175cb2
add @vsn attribute to the worker
bchamagne Mar 31, 2023
ad12c15
remove unused function
bchamagne Mar 31, 2023
9a7f01b
fix resync of :node network chain
bchamagne Mar 31, 2023
b39be05
check if there is already a repair in progress to avoid some I/O
bchamagne Apr 3, 2023
1e855aa
add a telemetry for the count of resyncs
bchamagne Apr 3, 2023
bced7ef
use node.last_address instead of deriving the last_public_key
bchamagne Apr 6, 2023
8203378
Return network_issue if there is no nodes to query
bchamagne Apr 19, 2023
16961ef
Remove RepairWorker & NetworkChainWorker. Replace them by DistinctEff…
bchamagne Apr 19, 2023
c44c099
Lint: use & function format
bchamagne Apr 19, 2023
c1028e4
Revert "Lint: use & function format"
bchamagne Apr 19, 2023
0b714ac
Revert "Remove RepairWorker & NetworkChainWorker. Replace them by Dis…
bchamagne Apr 19, 2023
af5bd14
replace NetworkChainWorker with a simple task
bchamagne Apr 19, 2023
0518f6f
add unit tests on extracted function
bchamagne Apr 19, 2023
2978c48
Lint: use private functions for readability
bchamagne Apr 20, 2023
4395365
Lint: check registry before spawning task
bchamagne Apr 20, 2023
796f644
rename utils function to a shorter name
bchamagne Apr 20, 2023
7b8d1ef
refactor RepairWorker to use a single API function
bchamagne Apr 20, 2023
f6878c1
change the RepairWorker tests
bchamagne Apr 20, 2023
71cf0f1
Merge branch 'develop' into 912-rolling-sync
bchamagne Apr 21, 2023
2c3bfbe
Refactor verify synchronization
Neylix Apr 25, 2023
c36d0db
Remove reward from rolling repair
Neylix Apr 25, 2023
73f73e4
Fix origin addresses bug
Neylix Apr 25, 2023
c080816
Fix conflict resolver error
Neylix Apr 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ config :archethic, Archethic.SelfRepair.Scheduler,
interval: "0 0 * * * * *",
availability_application: 1

config :archethic, Archethic.SelfRepair.NetworkView, enabled: false
config :archethic, Archethic.SelfRepair.Notifier, enabled: false

config :archethic, Archethic.SelfRepair.Sync,
Expand Down
77 changes: 71 additions & 6 deletions lib/archethic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ defmodule Archethic do
alias __MODULE__.{SelfRepair, TransactionChain, SharedSecrets, TaskSupervisor}
alias __MODULE__.Contracts.{Interpreter, Contract}

alias SelfRepair.NetworkView
alias SelfRepair.NetworkChain

alias Message.{NewTransaction, NotFound, StartMining}
alias Message.{Balance, GetBalance, GetTransactionSummary}
alias Message.{StartMining, Ok, TransactionSummaryMessage}
alias Message.{StartMining, Ok, Error, TransactionSummaryMessage}

alias TransactionChain.{Transaction, TransactionInput, TransactionSummary}

Expand Down Expand Up @@ -57,15 +60,16 @@ defmodule Archethic do
welcome_node_key \\ Crypto.first_node_public_key()
) do
if P2P.authorized_and_available_node?() do
case SharedSecrets.verify_synchronization() do
case NetworkChain.verify_synchronization(:node_shared_secrets) do
:ok ->
do_send_transaction(tx, welcome_node_key)

:error ->
forward_transaction(tx, welcome_node_key)

{:error, last_address_to_sync} ->
SelfRepair.resync(last_address_to_sync)
{:error, addresses} ->
SharedSecrets.genesis_address(:node_shared_secrets) |> SelfRepair.resync(addresses, [])

forward_transaction(tx, welcome_node_key)
end
else
Expand Down Expand Up @@ -142,12 +146,73 @@ defmodule Archethic do
message = %StartMining{
transaction: tx,
welcome_node_public_key: get_welcome_node_public_key(tx_type, welcome_node_key),
validation_node_public_keys: Enum.map(validation_nodes, & &1.last_public_key)
validation_node_public_keys: Enum.map(validation_nodes, & &1.last_public_key),
network_chains_view_hash: NetworkView.get_chains_hash(),
p2p_view_hash: NetworkView.get_p2p_hash()
}

P2P.broadcast_message(validation_nodes, message)
Task.Supervisor.async_stream_nolink(
Archethic.TaskSupervisor,
validation_nodes,
&P2P.send_message(&1, message),
ordered: false,
on_timeout: :kill_task,
timeout: Message.get_timeout(message) + 2000
)
|> Stream.filter(&match?({:ok, _}, &1))
|> Stream.map(fn {:ok, res} -> res end)
|> Enum.reduce(
%{
ok: 0,
network_chains_resync_needed: false,
p2p_resync_needed: false
},
&reduce_start_mining_responses/2
)
|> then(fn aggregated_responses ->
maybe_start_resync(aggregated_responses)

if should_forward_transaction?(aggregated_responses, length(validation_nodes)) do
forward_transaction(tx, welcome_node_key)
else
:ok
end
end)
end

defp reduce_start_mining_responses({:ok, %Ok{}}, acc) do
%{acc | ok: acc.ok + 1}
end

defp reduce_start_mining_responses({:ok, %Error{reason: :network_chains_sync}}, acc) do
%{acc | network_chains_resync_needed: true}
end

defp reduce_start_mining_responses({:ok, %Error{reason: :p2p_sync}}, acc) do
%{acc | p2p_resync_needed: true}
end

defp reduce_start_mining_responses({:ok, %Error{reason: :both_sync}}, acc) do
%{acc | network_chains_resync_needed: true, p2p_resync_needed: true}
end

defp reduce_start_mining_responses(_, acc) do
acc
end

defp maybe_start_resync(aggregated_responses) do
if aggregated_responses.network_chains_resync_needed do
NetworkChain.asynchronous_resync_many([:origin, :oracle, :node_shared_secrets])
end

if aggregated_responses.p2p_resync_needed do
NetworkChain.asynchronous_resync(:node)
end
end

defp should_forward_transaction?(_, 1), do: false
defp should_forward_transaction?(%{ok: ok_count}, _), do: ok_count < 2

# Since welcome node is not anymore constant, as we want unauthorised
# nodes to do some labor. Following bootstrapping, the txn of a new node
# is sent, with the welcome node being the same new node whose information
Expand Down
5 changes: 4 additions & 1 deletion lib/archethic/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ defmodule Archethic.Application do
http_port: http_port,
transport: transport
)},
MetricSupervisor
MetricSupervisor,

# a registry used in Utils to ensure a function is executed at most once concurrently
{Registry, keys: :unique, name: Archethic.RunExclusiveRegistry}
]

opts = [strategy: :rest_for_one, name: Archethic.Supervisor]
Expand Down
23 changes: 4 additions & 19 deletions lib/archethic/bootstrap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Archethic.Bootstrap do

alias Archethic
alias Archethic.{Bootstrap, Crypto, Networking, P2P, P2P.Node}
alias Archethic.{SelfRepair, SelfRepair, TransactionChain}
alias Archethic.{SelfRepair, SelfRepair.NetworkChain, TransactionChain}

alias Bootstrap.{NetworkInit, Sync, TransactionHandler}
alias TransactionChain.{Transaction, TransactionData}
Expand Down Expand Up @@ -210,7 +210,9 @@ defmodule Archethic.Bootstrap do
end

Archethic.Bootstrap.NetworkConstraints.persist_genesis_address()
resync_network_chain()

Logger.info("Enforced Resync: Started!")
NetworkChain.synchronous_resync_many([:node, :oracle, :origin, :node_shared_secrets])

Sync.publish_end_of_sync()
SelfRepair.start_scheduler()
Expand All @@ -219,23 +221,6 @@ defmodule Archethic.Bootstrap do
Archethic.PubSub.notify_node_status(:node_up)
end

def resync_network_chain() do
Logger.info("Enforced Resync: Started!")

if P2P.authorized_and_available_node?() do
# evict this node
nodes =
Enum.reject(
P2P.authorized_and_available_nodes(),
&(&1.first_public_key == Crypto.first_node_public_key())
)

# blocking code
[:oracle, :node_shared_secrets, :reward]
|> Enum.each(&SelfRepair.resync_network_chain(&1, nodes))
end
end

defp first_initialization(
ip,
port,
Expand Down
2 changes: 1 addition & 1 deletion lib/archethic/bootstrap/network_constraints.ex
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ defmodule Archethic.Bootstrap.NetworkConstraints do
OracleChain.update_summ_gen_addr()
Logger.info("Oracle Gen Addr Table: Loaded")

if gen_addr = OracleChain.genesis_address() do
if gen_addr = OracleChain.genesis_addresses() do
Logger.debug("New Oracle Gen Addr")
Logger.debug(gen_addr)
end
Expand Down
38 changes: 4 additions & 34 deletions lib/archethic/bootstrap/sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ defmodule Archethic.Bootstrap.Sync do
alias Archethic.P2P.Message.EncryptedStorageNonce
alias Archethic.P2P.Message.GetBootstrappingNodes
alias Archethic.P2P.Message.GetStorageNonce
alias Archethic.P2P.Message.ListNodes
alias Archethic.P2P.Message.NodeList
alias Archethic.P2P.Message.NotifyEndOfNodeSync
alias Archethic.P2P.Node

Expand Down Expand Up @@ -137,44 +135,16 @@ defmodule Archethic.Bootstrap.Sync do
"""
@spec load_node_list() :: :ok | {:error, :network_issue}
def load_node_list() do
current_nodes = P2P.authorized_and_available_nodes()

last_updated_nodes =
fn new_node = %Node{
first_public_key: public_key,
last_update_date: update_date
},
acc ->
previous_node =
%Node{last_update_date: previous_update_date} = Map.get(acc, public_key, new_node)

node =
if DateTime.compare(update_date, previous_update_date) == :gt,
do: new_node,
else: previous_node

Map.put(acc, public_key, node)
end

conflict_resolver = fn results ->
nodes =
Enum.flat_map(results, fn %NodeList{nodes: nodes} -> nodes end)
|> Enum.reduce(%{}, fn node, acc -> last_updated_nodes.(node, acc) end)
|> Map.values()

%NodeList{nodes: nodes}
end

case P2P.quorum_read(current_nodes, %ListNodes{}, conflict_resolver) do
{:ok, %NodeList{nodes: nodes}} ->
case P2P.fetch_nodes_list() do
{:ok, nodes} ->
Enum.each(nodes, &P2P.add_and_connect_node/1)
# After loading all current nodes, we update the p2p view with the last one stored in DB
# to have a proper view for the next beacon summary to self repair
SelfRepair.last_sync_date() |> P2P.reload_last_view()
Logger.info("Node list refreshed")

error ->
error
{:error, :network_issue} ->
{:error, :network_issue}
end
end

Expand Down
4 changes: 2 additions & 2 deletions lib/archethic/mining/pending_transaction_validation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ defmodule Archethic.Mining.PendingTransactionValidation do
%NotFound{}

res ->
Enum.sort_by(res, & &1.timestamp, {:desc, DateTime})
Enum.sort_by(res, & &1.transaction_summary.timestamp, {:desc, DateTime})
|> List.first()
end
end
Expand Down Expand Up @@ -704,7 +704,7 @@ defmodule Archethic.Mining.PendingTransactionValidation do
when type in [:oracle, :oracle_summary] do
# multipe txn chain based on summary date

case OracleChain.genesis_address() do
case OracleChain.genesis_addresses() do
nil ->
false

Expand Down
10 changes: 3 additions & 7 deletions lib/archethic/networking/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ defmodule Archethic.Networking.Scheduler do
@vsn Mix.Project.config()[:version]

alias Archethic.{Crypto, P2P, P2P.Node, Networking, TaskSupervisor, Utils, PubSub}
alias Archethic.{SelfRepair, TransactionChain}
alias Archethic.TransactionChain
alias Archethic.SelfRepair.NetworkChain

alias Networking.{IPLookup, PortForwarding}
alias TransactionChain.{Transaction, TransactionData}
Expand Down Expand Up @@ -139,11 +140,6 @@ defmodule Archethic.Networking.Scheduler do

Utils.await_confirmation(tx_address, nodes)

types = [:node, :oracle, :node_shared_secrets, :reward]

Task.Supervisor.async_stream_nolink(Archethic.TaskSupervisor, types, fn type ->
SelfRepair.resync_network_chain(type, nodes)
end)
|> Stream.run()
NetworkChain.asynchronous_resync_many([:node, :oracle, :node_shared_secrets, :origin])
end
end
18 changes: 16 additions & 2 deletions lib/archethic/oracle_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,22 @@ defmodule Archethic.OracleChain do
@doc """
Returns current and previous summary_time genesis address of oracle chain
"""
@spec genesis_address() :: map() | nil
defdelegate genesis_address(),
@spec genesis_addresses() :: map() | nil
defdelegate genesis_addresses(),
to: MemTable,
as: :get_addr

@doc """
Returns current genesis address of oracle chain
"""
@spec genesis_address() :: binary() | nil
def genesis_address() do
case genesis_addresses() do
%{current: {address, _time}} ->
address

_ ->
nil
end
end
end
43 changes: 43 additions & 0 deletions lib/archethic/p2p.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ defmodule Archethic.P2P do

alias __MODULE__.{BootstrappingSeeds, Client, GeoPatch, MemTable, MemTableLoader, Message, Node}

alias __MODULE__.Message.NodeList
alias __MODULE__.Message.ListNodes

require Logger

@type supported_transport :: :tcp
Expand Down Expand Up @@ -66,6 +69,46 @@ defmodule Archethic.P2P do
@spec list_nodes() :: list(Node.t())
defdelegate list_nodes, to: MemTable

@doc """
Fetch the list of nodes from close nodes
"""
@spec fetch_nodes_list() :: {:ok, list(Node.t())} | {:error, :network_issue}
def fetch_nodes_list() do
last_updated_nodes =
fn new_node = %Node{
first_public_key: public_key,
last_update_date: update_date
},
acc ->
previous_node =
%Node{last_update_date: previous_update_date} = Map.get(acc, public_key, new_node)

node =
if DateTime.compare(update_date, previous_update_date) == :gt,
do: new_node,
else: previous_node

Map.put(acc, public_key, node)
end

conflict_resolver = fn results ->
nodes =
Enum.flat_map(results, fn %NodeList{nodes: nodes} -> nodes end)
|> Enum.reduce(%{}, fn node, acc -> last_updated_nodes.(node, acc) end)
|> Map.values()

%NodeList{nodes: nodes}
end

case quorum_read(authorized_and_available_nodes(), %ListNodes{}, conflict_resolver) do
{:ok, %NodeList{nodes: nodes}} ->
{:ok, nodes}

{:error, :network_issue} ->
{:error, :network_issue}
end
end

@doc """
Return the list of available nodes
"""
Expand Down
9 changes: 9 additions & 0 deletions lib/archethic/p2p/message/error.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ defmodule Archethic.P2P.Message.Error do
| :invalid_transaction
| :invalid_attestation
| :transaction_already_exists
| :network_chains_sync
| :p2p_sync
| :both_sync

@type t :: %__MODULE__{
reason: reason()
Expand All @@ -33,6 +36,9 @@ defmodule Archethic.P2P.Message.Error do
def serialize_reason(:invalid_transaction), do: 1
def serialize_reason(:invalid_attestation), do: 2
def serialize_reason(:transaction_already_exists), do: 3
def serialize_reason(:network_chains_sync), do: 4
def serialize_reason(:p2p_sync), do: 5
def serialize_reason(:both_sync), do: 6

@doc """
Deserialize an error reason
Expand All @@ -42,4 +48,7 @@ defmodule Archethic.P2P.Message.Error do
def deserialize_reason(1), do: :invalid_transaction
def deserialize_reason(2), do: :invalid_attestation
def deserialize_reason(3), do: :transaction_already_exists
def deserialize_reason(4), do: :network_chains_sync
def deserialize_reason(5), do: :p2p_sync
def deserialize_reason(6), do: :both_sync
end
Loading