From 091d8e87676d9113f66b18a6febd48def2fbced7 Mon Sep 17 00:00:00 2001 From: Apoorv-2204 <90304197+apoorv-2204@users.noreply.github.com> Date: Fri, 2 Dec 2022 16:41:17 +0530 Subject: [PATCH] Fix self repair notifier (#645 ) * Fix notifier workflow for storage transaction * Add IO transactions * Add before? flag for self repair to avoid using the newly authorized nodes * Start Notifier after availability update to ensure nodes finished their self repair * Ensure addresses are expected to be replicated * Request missing chain addresses while new transaction stored * Fix beacon live downloading aggregate from BD instead of quorum * Store beacon aggregate when notifier is triggered Co-authored-by: Neylix --- Makefile | 5 +- config/config.exs | 1 + lib/archethic/beacon_chain.ex | 9 +- lib/archethic/beacon_chain/subset.ex | 7 +- lib/archethic/db.ex | 4 +- lib/archethic/db/embedded_impl.ex | 26 +- lib/archethic/db/embedded_impl/chain_index.ex | 51 +-- lib/archethic/p2p.ex | 42 +- lib/archethic/p2p/message.ex | 279 +++++++++----- lib/archethic/p2p/message/address_list.ex | 94 +++++ .../p2p/message/get_next_addresses.ex | 46 +++ lib/archethic/p2p/message/shard_repair.ex | 123 ++++++ .../p2p/message/transaction_input_list.ex | 4 +- lib/archethic/self_repair.ex | 83 +++- lib/archethic/self_repair/notifier.ex | 359 ++++++++++++------ lib/archethic/self_repair/repair_worker.ex | 184 +++++++++ lib/archethic/self_repair/supervisor.ex | 8 +- lib/archethic/self_repair/sync.ex | 37 +- lib/archethic/transaction_chain.ex | 63 ++- .../benchmarks/end_to_end_validation.ex | 1 - lib/archethic_web/live/chains/beacon_live.ex | 4 +- .../live/chains/node_shared_secrets_live.ex | 2 +- lib/archethic_web/live/chains/reward_live.ex | 2 +- .../p2p/message/address_list_test.exs | 7 + .../p2p/message/get_next_addresses_test.exs | 7 + .../p2p/message/shard_repair_test.exs | 7 + test/archethic/p2p/messages_test.exs | 137 ++++--- test/archethic/replication_test.exs | 2 +- test/archethic/self_repair/notifier_test.exs | 275 +++++++++++--- .../self_repair/repair_worker_test.exs | 164 ++++++++ test/archethic_web/live/rewards_live_test.exs | 1 - 31 files changed, 1612 insertions(+), 422 deletions(-) create mode 100644 lib/archethic/p2p/message/address_list.ex create mode 100644 lib/archethic/p2p/message/get_next_addresses.ex create mode 100644 lib/archethic/p2p/message/shard_repair.ex create mode 100644 lib/archethic/self_repair/repair_worker.ex create mode 100644 test/archethic/p2p/message/address_list_test.exs create mode 100644 test/archethic/p2p/message/get_next_addresses_test.exs create mode 100644 test/archethic/p2p/message/shard_repair_test.exs create mode 100644 test/archethic/self_repair/repair_worker_test.exs diff --git a/Makefile b/Makefile index b153deda3..a839a6360 100644 --- a/Makefile +++ b/Makefile @@ -9,11 +9,11 @@ compile_c_programs: mkdir -p priv/c_dist $(CC) src/c/crypto/stdio_helpers.c src/c/crypto/ed25519.c -o priv/c_dist/libsodium_port -I src/c/crypto/stdio_helpers.h -lsodium $(CC) src/c/hypergeometric_distribution.c -o priv/c_dist/hypergeometric_distribution -lgmp - + git submodule update --force --recursive --init --remote $(MAKE) -C src/c/nat/miniupnp/miniupnpc cp src/c/nat/miniupnp/miniupnpc/build/upnpc-static priv/c_dist/upnpc - + ifeq ($(TPM_INSTALLED),0) $(CC) src/c/crypto/stdio_helpers.c src/c/crypto/tpm/lib.c src/c/crypto/tpm/port.c -o priv/c_dist/tpm_port -I src/c/crypto/stdio_helpers.h -I src/c/crypto/tpm/lib.h $(TPMFLAGS) @@ -24,6 +24,7 @@ endif clean: rm -f priv/c_dist/* mix archethic.clean_db + mix clean docker-clean: clean docker container stop $$(docker ps -a --filter=name=utn* -q) diff --git a/config/config.exs b/config/config.exs index 5c15f020d..ba2457006 100644 --- a/config/config.exs +++ b/config/config.exs @@ -7,6 +7,7 @@ config :git_hooks, pre_push: [ tasks: [ {:cmd, "mix clean"}, + {:cmd, "mix git_hooks.install"}, {:cmd, "mix hex.outdated --within-requirements"}, {:cmd, "mix format --check-formatted"}, {:cmd, "mix compile --warnings-as-errors"}, diff --git a/lib/archethic/beacon_chain.ex b/lib/archethic/beacon_chain.ex index 7bf3f89c1..1215630d4 100644 --- a/lib/archethic/beacon_chain.ex +++ b/lib/archethic/beacon_chain.ex @@ -320,17 +320,10 @@ defmodule Archethic.BeaconChain do end defp get_summary_address_by_node(date, subset, authorized_nodes) do - # Remove the newly authorized nodes at this specific time - filter_nodes = - case authorized_nodes do - [node] -> [node] - nodes -> Enum.filter(nodes, &(DateTime.compare(&1.authorization_date, date) == :lt)) - end - summary_address = Crypto.derive_beacon_chain_address(subset, date, true) subset - |> Election.beacon_storage_nodes(date, filter_nodes) + |> Election.beacon_storage_nodes(date, authorized_nodes) |> Enum.map(fn node -> {node, summary_address} end) diff --git a/lib/archethic/beacon_chain/subset.ex b/lib/archethic/beacon_chain/subset.ex index 1d374a58e..129edc55c 100644 --- a/lib/archethic/beacon_chain/subset.ex +++ b/lib/archethic/beacon_chain/subset.ex @@ -323,12 +323,7 @@ defmodule Archethic.BeaconChain.Subset do end defp broadcast_beacon_slot(subset, next_time, slot) do - # Remove the newly authorized nodes at this specific time - nodes = - case P2P.authorized_and_available_nodes(next_time) do - [node] -> [node] - nodes -> Enum.filter(nodes, &(DateTime.compare(&1.authorization_date, next_time) == :lt)) - end + nodes = P2P.authorized_and_available_nodes(next_time, true) subset |> Election.beacon_storage_nodes(next_time, nodes) diff --git a/lib/archethic/db.ex b/lib/archethic/db.ex index cc53a83e3..b638bd9aa 100644 --- a/lib/archethic/db.ex +++ b/lib/archethic/db.ex @@ -42,7 +42,7 @@ defmodule Archethic.DB do @callback list_addresses_by_type(Transaction.transaction_type()) :: Enumerable.t() | list(binary()) @callback list_chain_addresses(binary()) :: - Enumerable.t() | list({binary(), non_neg_integer()}) + Enumerable.t() | list({binary(), DateTime.t()}) @callback get_last_chain_address(binary()) :: {binary(), DateTime.t()} @callback get_last_chain_address(binary(), DateTime.t()) :: {binary(), DateTime.t()} @@ -79,4 +79,6 @@ defmodule Archethic.DB do :ok @callback get_inputs(ledger :: :UCO | :token, address :: binary()) :: list(VersionedTransactionInput.t()) + + @callback stream_first_addresses() :: Enumerable.t() end diff --git a/lib/archethic/db/embedded_impl.ex b/lib/archethic/db/embedded_impl.ex index 2ac7d9900..0136e7838 100644 --- a/lib/archethic/db/embedded_impl.ex +++ b/lib/archethic/db/embedded_impl.ex @@ -216,7 +216,7 @@ defmodule Archethic.DB.EmbeddedImpl do Stream all the addresses from the Genesis address(following it). """ @spec list_chain_addresses(binary()) :: - Enumerable.t() | list({binary(), non_neg_integer()}) + Enumerable.t() | list({binary(), DateTime.t()}) def list_chain_addresses(address) when is_binary(address) do ChainIndex.list_chain_addresses(address, db_path()) end @@ -249,16 +249,16 @@ defmodule Archethic.DB.EmbeddedImpl do end @doc """ - Reference a last address from a previous address + Reference a last address from a genesis address """ @spec add_last_transaction_address( - previous_address :: binary(), + genesis_address :: binary(), address :: binary(), tx_time :: DateTime.t() ) :: :ok - def add_last_transaction_address(previous_address, address, date = %DateTime{}) - when is_binary(previous_address) and is_binary(address) do - ChainIndex.set_last_chain_address(previous_address, address, date, db_path()) + def add_last_transaction_address(genesis_address, address, date = %DateTime{}) + when is_binary(genesis_address) and is_binary(address) do + ChainIndex.set_last_chain_address(genesis_address, address, date, db_path()) end @doc """ @@ -384,4 +384,18 @@ defmodule Archethic.DB.EmbeddedImpl do @spec get_inputs(ledger :: :UCO | :token, address :: binary()) :: list(VersionedTransactionInput.t()) defdelegate get_inputs(ledger, address), to: InputsReader, as: :get_inputs + + @doc """ + Stream first transactions address of a chain from genesis_address. + """ + @spec stream_first_addresses :: Enumerable.t() + def stream_first_addresses do + ChainIndex.list_genesis_addresses() + |> Stream.map(fn gen_address -> + gen_address + |> list_chain_addresses() + |> Enum.at(0) + |> elem(0) + end) + end end diff --git a/lib/archethic/db/embedded_impl/chain_index.ex b/lib/archethic/db/embedded_impl/chain_index.ex index 1dad19041..1da38d4d8 100644 --- a/lib/archethic/db/embedded_impl/chain_index.ex +++ b/lib/archethic/db/embedded_impl/chain_index.ex @@ -10,6 +10,11 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do alias Archethic.DB.EmbeddedImpl.ChainWriter alias Archethic.TransactionChain.Transaction + @archethic_db_tx_index :archethic_db_tx_index + @archethic_db_chain_stats :archethic_db_chain_stats + @archethic_db_last_index :archethic_db_last_index + @archethic_db_type_stats :archethic_db_type_stats + def start_link(arg \\ []) do GenServer.start_link(__MODULE__, arg, name: __MODULE__) end @@ -17,10 +22,10 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do def init(opts) do db_path = Keyword.fetch!(opts, :path) - :ets.new(:archethic_db_tx_index, [:set, :named_table, :public, read_concurrency: true]) - :ets.new(:archethic_db_chain_stats, [:set, :named_table, :public, read_concurrency: true]) - :ets.new(:archethic_db_last_index, [:set, :named_table, :public, read_concurrency: true]) - :ets.new(:archethic_db_type_stats, [:set, :named_table, :public, read_concurrency: true]) + :ets.new(@archethic_db_tx_index, [:set, :named_table, :public, read_concurrency: true]) + :ets.new(@archethic_db_chain_stats, [:set, :named_table, :public, read_concurrency: true]) + :ets.new(@archethic_db_last_index, [:set, :named_table, :public, read_concurrency: true]) + :ets.new(@archethic_db_type_stats, [:set, :named_table, :public, read_concurrency: true]) fill_tables(db_path) @@ -60,12 +65,12 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do true = :ets.insert( - :archethic_db_tx_index, + @archethic_db_tx_index, {current_address, %{size: size, offset: offset, genesis_address: genesis_address}} ) :ets.update_counter( - :archethic_db_chain_stats, + @archethic_db_chain_stats, genesis_address, [ {2, size}, @@ -89,7 +94,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do true = :ets.insert( - :archethic_db_last_index, + @archethic_db_last_index, {genesis_address, last_address, DateTime.to_unix(timestamp, :millisecond)} ) end) @@ -100,10 +105,10 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do case File.open(type_path(db_path, type), [:read, :binary]) do {:ok, fd} -> nb_txs = do_scan_types(fd) - :ets.insert(:archethic_db_type_stats, {type, nb_txs}) + :ets.insert(@archethic_db_type_stats, {type, nb_txs}) {:error, _} -> - :ets.insert(:archethic_db_type_stats, {type, 0}) + :ets.insert(@archethic_db_type_stats, {type, 0}) end end) end @@ -142,12 +147,12 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do # Write fast lookup entry for this transaction on memory true = :ets.insert( - :archethic_db_tx_index, + @archethic_db_tx_index, {tx_address, %{size: size, offset: last_offset, genesis_address: genesis_address}} ) :ets.update_counter( - :archethic_db_chain_stats, + @archethic_db_chain_stats, genesis_address, [ {2, size}, @@ -162,7 +167,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do @spec get_file_stats(binary()) :: {offset :: non_neg_integer(), nb_transactions :: non_neg_integer()} def get_file_stats(genesis_address) do - case :ets.lookup(:archethic_db_chain_stats, genesis_address) do + case :ets.lookup(@archethic_db_chain_stats, genesis_address) do [{_, last_offset, nb_txs}] -> {last_offset, nb_txs} @@ -207,7 +212,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do """ @spec get_tx_entry(binary(), String.t()) :: {:ok, map()} | {:error, :not_exists} def get_tx_entry(address, db_path) do - case :ets.lookup(:archethic_db_tx_index, address) do + case :ets.lookup(@archethic_db_tx_index, address) do [] -> # If the transaction is not found in the in memory lookup # we scan the index file for the subset of the transaction to find the relative information @@ -299,7 +304,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do Stream all the transaction addresses from genesis_address-address. """ @spec list_chain_addresses(binary(), String.t()) :: - Enumerable.t() | list({binary(), non_neg_integer()}) + Enumerable.t() | list({binary(), DateTime.t()}) def list_chain_addresses(address, db_path) when is_binary(address) do filepath = chain_addresses_path(db_path, address) @@ -316,7 +321,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do {:ok, hash} <- :file.read(fd, hash_size) do address = <> # return tuple of address and timestamp - {[{address, timestamp}], {:ok, fd}} + {[{address, DateTime.from_unix!(timestamp, :millisecond)}], {:ok, fd}} else :eof -> :file.close(fd) @@ -335,7 +340,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do """ @spec count_transactions_by_type(Transaction.transaction_type()) :: non_neg_integer() def count_transactions_by_type(type) do - case :ets.lookup(:archethic_db_type_stats, type) do + case :ets.lookup(@archethic_db_type_stats, type) do [] -> 0 @@ -351,7 +356,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do :ok def add_tx_type(type, address, db_path) do File.write!(type_path(db_path, type), address, [:append, :binary]) - :ets.update_counter(:archethic_db_type_stats, type, {2, 1}, {type, 0}) + :ets.update_counter(@archethic_db_type_stats, type, {2, 1}, {type, 0}) :ok end @@ -372,7 +377,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do filename = chain_addresses_path(db_path, genesis_address) write_last_chain_transaction? = - case :ets.lookup(:archethic_db_last_index, genesis_address) do + case :ets.lookup(@archethic_db_last_index, genesis_address) do [{_, ^new_address, _}] -> false [{_, _, chain_unix_time}] when unix_time < chain_unix_time -> false _ -> true @@ -380,7 +385,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do if write_last_chain_transaction? do :ok = File.write!(filename, encoded_data, [:binary, :append]) - true = :ets.insert(:archethic_db_last_index, {genesis_address, new_address, unix_time}) + true = :ets.insert(@archethic_db_last_index, {genesis_address, new_address, unix_time}) end :ok @@ -396,7 +401,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do case get_tx_entry(address, db_path) do {:ok, %{genesis_address: genesis_address}} -> # Search in the latest in memory index - case :ets.lookup(:archethic_db_last_index, genesis_address) do + case :ets.lookup(@archethic_db_last_index, genesis_address) do [] -> # If not present, the we search in the index file unix_time = DateTime.utc_now() |> DateTime.to_unix(:millisecond) @@ -410,7 +415,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do {:error, :not_exists} -> # We try if the request address is the genesis address to fetch the in memory index - case :ets.lookup(:archethic_db_last_index, address) do + case :ets.lookup(@archethic_db_last_index, address) do [] -> # If not present, the we search in the index file unix_time = DateTime.utc_now() |> DateTime.to_unix(:millisecond) @@ -610,14 +615,14 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do end defp stream_genesis_addresses(acc = []) do - case :ets.first(:archethic_db_chain_stats) do + case :ets.first(@archethic_db_chain_stats) do :"$end_of_table" -> {:halt, acc} first_key -> {[first_key], first_key} end end defp stream_genesis_addresses(acc) do - case :ets.next(:archethic_db_chain_stats, acc) do + case :ets.next(@archethic_db_chain_stats, acc) do :"$end_of_table" -> {:halt, acc} next_key -> {[next_key], next_key} end diff --git a/lib/archethic/p2p.ex b/lib/archethic/p2p.ex index 7c01e7ff2..e6fe4c14c 100644 --- a/lib/archethic/p2p.ex +++ b/lib/archethic/p2p.ex @@ -165,20 +165,25 @@ defmodule Archethic.P2P do end @doc """ - List the authorized nodes for the given datetime (default to now) + List the authorized nodes for the given datetime (default to now) or before if needed """ @spec authorized_nodes(DateTime.t()) :: list(Node.t()) - def authorized_nodes(date \\ DateTime.utc_now()) do + def authorized_nodes(date \\ DateTime.utc_now(), before? \\ false) do nodes = MemTable.authorized_nodes() |> Enum.filter(fn %Node{authorization_date: authorization_date} -> - DateTime.compare(authorization_date, date) != :gt + if before?, + do: DateTime.compare(authorization_date, date) == :lt, + else: DateTime.compare(authorization_date, date) != :gt end) case nodes do [] -> # Only happen during bootstrap - get_first_enrolled_node() + case get_first_enrolled_node() do + nil -> [] + node -> [node] + end nodes -> nodes @@ -187,34 +192,47 @@ defmodule Archethic.P2P do @doc """ List the authorized and available nodes + before? is used in for self repair to not take in account the newly + authorized nodes """ - @spec authorized_and_available_nodes(DateTime.t()) :: list(Node.t()) - def authorized_and_available_nodes(date \\ DateTime.utc_now()) do + @spec authorized_and_available_nodes(DateTime.t(), boolean()) :: list(Node.t()) + def authorized_and_available_nodes(date \\ DateTime.utc_now(), before? \\ false) do nodes = - authorized_nodes(date) + authorized_nodes(date, before?) |> Enum.filter(fn %Node{available?: true, availability_update: availability_update} -> - DateTime.compare(date, availability_update) != :lt + if before?, + do: DateTime.compare(date, availability_update) == :gt, + else: DateTime.compare(date, availability_update) != :lt %Node{available?: false, availability_update: availability_update} -> - DateTime.compare(date, availability_update) == :lt + if before?, + do: DateTime.compare(date, availability_update) != :gt, + else: DateTime.compare(date, availability_update) == :lt end) case nodes do [] -> # Only happen for init transactions so we take the first enrolled node - get_first_enrolled_node() + case get_first_enrolled_node() do + nil -> [] + node -> [node] + end nodes -> nodes end end - defp get_first_enrolled_node() do + @doc """ + Return the first enrolled node + """ + @spec get_first_enrolled_node() :: Node.t() | nil + def get_first_enrolled_node() do list_nodes() |> Enum.reject(&(&1.enrollment_date == nil)) |> Enum.sort_by(& &1.enrollment_date, {:asc, DateTime}) - |> Enum.take(1) + |> List.first() end @doc """ diff --git a/lib/archethic/p2p/message.ex b/lib/archethic/p2p/message.ex index b66dbe0fa..140a309bf 100644 --- a/lib/archethic/p2p/message.ex +++ b/lib/archethic/p2p/message.ex @@ -2,108 +2,109 @@ defmodule Archethic.P2P.Message do @moduledoc """ Provide functions to encode and decode P2P messages using a custom binary protocol """ - alias Archethic.Account - - alias Archethic.BeaconChain - alias Archethic.BeaconChain.ReplicationAttestation - alias Archethic.BeaconChain.Summary - alias Archethic.BeaconChain.SummaryAggregate - alias Archethic.BeaconChain.Slot - alias Archethic.BeaconChain.Subset - alias Archethic.BeaconChain.Slot - - alias Archethic.Contracts - - alias Archethic.Crypto - - alias Archethic.Election - - alias Archethic.Mining - - alias Archethic.P2P - - alias __MODULE__.AcknowledgeStorage - alias __MODULE__.AddMiningContext - alias __MODULE__.Balance - alias __MODULE__.BeaconSummaryList - alias __MODULE__.BeaconUpdate - alias __MODULE__.BootstrappingNodes - alias __MODULE__.CrossValidate - alias __MODULE__.CrossValidationDone - alias __MODULE__.EncryptedStorageNonce - alias __MODULE__.Error - alias __MODULE__.FirstPublicKey - alias __MODULE__.FirstAddress - alias __MODULE__.GetFirstAddress - alias __MODULE__.GetBalance - alias __MODULE__.GetBeaconSummaries - alias __MODULE__.GetBeaconSummary - alias __MODULE__.GetBeaconSummariesAggregate - alias __MODULE__.GetBootstrappingNodes - alias __MODULE__.GetCurrentSummaries - alias __MODULE__.GetFirstPublicKey - alias __MODULE__.GetLastTransaction - alias __MODULE__.GetLastTransactionAddress - alias __MODULE__.GetP2PView - alias __MODULE__.GetStorageNonce - alias __MODULE__.GetTransaction - alias __MODULE__.GetTransactionChain - alias __MODULE__.GetTransactionChainLength - alias __MODULE__.GetTransactionInputs - alias __MODULE__.GetTransactionSummary - alias __MODULE__.GetUnspentOutputs - alias __MODULE__.LastTransactionAddress - alias __MODULE__.ListNodes - alias __MODULE__.NewBeaconSlot - alias __MODULE__.NewTransaction - alias __MODULE__.NodeList - alias __MODULE__.NotFound - alias __MODULE__.NotifyEndOfNodeSync - alias __MODULE__.NotifyPreviousChain - alias __MODULE__.NotifyLastTransactionAddress - alias __MODULE__.Ok - alias __MODULE__.P2PView - alias __MODULE__.Ping - alias __MODULE__.RegisterBeaconUpdates - alias __MODULE__.ReplicateTransaction - alias __MODULE__.ReplicateTransactionChain - alias __MODULE__.ReplicationError - alias __MODULE__.StartMining - alias __MODULE__.TransactionChainLength - alias __MODULE__.TransactionInputList - alias __MODULE__.TransactionSummaryList - alias __MODULE__.TransactionList - alias __MODULE__.UnspentOutputList - alias __MODULE__.ValidationError - - alias Archethic.P2P.Node - - alias Archethic.PubSub - - alias Archethic.Replication - - alias Archethic.TransactionChain - alias Archethic.TransactionChain.Transaction - alias Archethic.TransactionChain.Transaction.CrossValidationStamp - alias Archethic.TransactionChain.Transaction.ValidationStamp - - alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations - - alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.VersionedUnspentOutput - - alias Archethic.TransactionChain.TransactionInput - alias Archethic.TransactionChain.VersionedTransactionInput - alias Archethic.TransactionChain.TransactionSummary - - alias Archethic.TaskSupervisor - - alias Archethic.Utils - alias Archethic.Utils.VarInt - require Logger + alias Archethic.{ + Account, + BeaconChain, + Contracts, + Crypto, + Election, + Mining, + P2P, + P2P.Node, + PubSub, + Replication, + TransactionChain, + TaskSupervisor, + Utils, + Utils.VarInt + } + + alias Archethic.BeaconChain.{ + ReplicationAttestation, + Summary, + SummaryAggregate, + Slot, + Subset, + Slot + } + + alias Archethic.SelfRepair + + alias Archethic.TransactionChain.{ + Transaction, + Transaction.CrossValidationStamp, + Transaction.ValidationStamp, + TransactionInput, + TransactionSummary, + VersionedTransactionInput, + Transaction.ValidationStamp.LedgerOperations, + Transaction.ValidationStamp.LedgerOperations.VersionedUnspentOutput + } + + alias __MODULE__.{ + AcknowledgeStorage, + AddMiningContext, + AddressList, + Balance, + BeaconSummaryList, + BeaconUpdate, + BootstrappingNodes, + CrossValidate, + CrossValidationDone, + EncryptedStorageNonce, + Error, + FirstPublicKey, + FirstAddress, + GetFirstAddress, + GetBalance, + GetBeaconSummaries, + GetBeaconSummary, + GetBeaconSummariesAggregate, + GetBootstrappingNodes, + GetCurrentSummaries, + GetFirstPublicKey, + GetLastTransaction, + GetLastTransactionAddress, + GetNextAddresses, + GetP2PView, + GetStorageNonce, + GetTransaction, + GetTransactionChain, + GetTransactionChainLength, + GetTransactionInputs, + GetTransactionSummary, + GetUnspentOutputs, + LastTransactionAddress, + ListNodes, + NewBeaconSlot, + NewTransaction, + NodeList, + NotFound, + NotifyEndOfNodeSync, + NotifyLastTransactionAddress, + NotifyPreviousChain, + Ok, + P2PView, + Ping, + RegisterBeaconUpdates, + ReplicateTransaction, + ReplicateTransactionChain, + ReplicationError, + ShardRepair, + StartMining, + TransactionChainLength, + TransactionInputList, + TransactionSummaryList, + TransactionList, + UnspentOutputList, + ValidationError + } alias ArchethicWeb.TransactionSubscriber + require Logger + @type t :: request() | response() @type request :: @@ -141,6 +142,8 @@ defmodule Archethic.P2P.Message do | GetCurrentSummaries.t() | GetBeaconSummariesAggregate.t() | NotifyPreviousChain.t() + | ShardRepair.t() + | GetNextAddresses.t() @type response :: Ok.t() @@ -165,6 +168,7 @@ defmodule Archethic.P2P.Message do | FirstAddress.t() | ReplicationError.t() | SummaryAggregate.t() + | AddressList.t() @floor_upload_speed Application.compile_env!(:archethic, [__MODULE__, :floor_upload_speed]) @content_max_size Application.compile_env!(:archethic, :transaction_data_content_max_size) @@ -434,6 +438,18 @@ defmodule Archethic.P2P.Message do <<34::8, address::binary>> end + def encode(msg = %GetNextAddresses{}) do + <<35::8, GetNextAddresses.serialize(msg)::bitstring>> + end + + def encode(msg = %AddressList{}) do + <<229::8, AddressList.serialize(msg)::bitstring>> + end + + def encode(msg = %ShardRepair{}) do + <<230::8, ShardRepair.serialize(msg)::bitstring>> + end + def encode(aggregate = %SummaryAggregate{}) do <<231::8, SummaryAggregate.serialize(aggregate)::bitstring>> end @@ -968,6 +984,18 @@ defmodule Archethic.P2P.Message do {%NotifyPreviousChain{address: address}, rest} end + def decode(<<35::8, rest::bitstring>>) do + GetNextAddresses.deserialize(rest) + end + + def decode(<<229::8, rest::bitstring>>) do + AddressList.deserialize(rest) + end + + def decode(<<230::8, rest::bitstring>>) do + ShardRepair.deserialize(rest) + end + def decode(<<231::8, rest::bitstring>>) do SummaryAggregate.deserialize(rest) end @@ -1748,7 +1776,7 @@ defmodule Archethic.P2P.Message do def process(%NewBeaconSlot{slot: slot = %Slot{subset: subset, slot_time: slot_time}}, _) do summary_time = BeaconChain.next_summary_date(slot_time) - node_list = P2P.authorized_and_available_nodes(summary_time) + node_list = P2P.authorized_and_available_nodes(summary_time, true) beacon_summary_nodes = Election.beacon_storage_nodes( @@ -1835,6 +1863,59 @@ defmodule Archethic.P2P.Message do %Ok{} end + def process( + %ShardRepair{ + first_address: first_address, + storage_address: storage_address, + io_addresses: io_addresses + }, + _ + ) do + # Ensure all addresses are expected to be replicated + nodes = P2P.authorized_and_available_nodes() + + addresses = + if storage_address != nil, do: [storage_address | io_addresses], else: io_addresses + + public_key = Crypto.first_node_public_key() + + if Enum.all?( + addresses, + &(Election.storage_nodes(&1, nodes) |> Utils.key_in_node_list?(public_key)) + ) do + case SelfRepair.repair_in_progress?(first_address) do + false -> + SelfRepair.start_worker( + first_address: first_address, + storage_address: storage_address, + io_addresses: io_addresses + ) + + pid -> + SelfRepair.add_repair_addresses(pid, storage_address, io_addresses) + end + end + + %Ok{} + end + + def process(%GetNextAddresses{address: address}, _) do + case TransactionChain.get_transaction(address, validation_stamp: [:timestamp]) do + {:ok, %Transaction{validation_stamp: %ValidationStamp{timestamp: address_timestamp}}} -> + addresses = + TransactionChain.get_genesis_address(address) + |> TransactionChain.list_chain_addresses() + |> Enum.filter(fn {_address, timestamp} -> + DateTime.compare(timestamp, address_timestamp) == :gt + end) + + %AddressList{addresses: addresses} + + _ -> + %AddressList{addresses: []} + end + end + defp process_replication_chain(tx, replying_node_public_key) do Task.Supervisor.start_child(TaskSupervisor, fn -> response = diff --git a/lib/archethic/p2p/message/address_list.ex b/lib/archethic/p2p/message/address_list.ex new file mode 100644 index 000000000..af0a09236 --- /dev/null +++ b/lib/archethic/p2p/message/address_list.ex @@ -0,0 +1,94 @@ +defmodule Archethic.P2P.Message.AddressList do + @moduledoc """ + Inform a shard to start repair. + """ + @enforce_keys [:addresses] + defstruct [:addresses] + + alias Archethic.Crypto + + alias Archethic.Utils + alias Archethic.Utils.VarInt + + @type t :: %__MODULE__{addresses: list(Crypto.prepended_hash())} + + @doc """ + Serialize AddressList Struct + + iex> %AddressList{ + ...> addresses: [{<<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + ...> 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>>, ~U[2022-11-27 12:34:56.789Z]}, + ...> {<<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + ...> 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 123, 321>>, ~U[2022-11-27 12:34:54.321Z]}] + ...> } |> AddressList.serialize() + # VarInt + <<1, 2, + # Addresses + 0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251, + 0, 0, 1, 132, 185, 21, 96, 149, + 0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 123, 321, + 0, 0, 1, 132, 185, 21, 86, 241>> + """ + def serialize(%__MODULE__{addresses: addresses}) do + addresses_bin = + addresses + |> Stream.map(fn {address, timestamp} -> + <> + end) + |> Enum.to_list() + |> :erlang.list_to_binary() + + <> + end + + @doc """ + Deserialize AddressList Struct + + iex> # VarInt + ...> <<1, 2, + ...> # Addresses + ...> 0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + ...> 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251, + ...> 0, 0, 1, 132, 185, 21, 96, 149, + ...> 0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + ...> 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 123, 321, + ...> 0, 0, 1, 132, 185, 21, 86, 241>> + ...> |> AddressList.deserialize() + { + %AddressList{ + addresses: [ + {<<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>>, ~U[2022-11-27 12:34:56.789Z]}, + {<<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 123, 321>>, ~U[2022-11-27 12:34:54.321Z]} + ] + }, ""} + + """ + def deserialize(bin) do + {addresses_length, rest} = VarInt.get_value(bin) + + {addresses, rest} = deserialize_list(rest, addresses_length, []) + + {%__MODULE__{addresses: addresses}, rest} + end + + defp deserialize_list(rest, 0, _), do: {[], rest} + + defp deserialize_list(rest, nb_elt, acc) when length(acc) == nb_elt do + {Enum.reverse(acc), rest} + end + + defp deserialize_list(rest, nb_elt, acc) do + {elt, rest} = deserialize_elt(rest) + deserialize_list(rest, nb_elt, [elt | acc]) + end + + defp deserialize_elt(bin) do + {address, <>} = Utils.deserialize_address(bin) + + {{address, DateTime.from_unix!(timestamp, :millisecond)}, rest} + end +end diff --git a/lib/archethic/p2p/message/get_next_addresses.ex b/lib/archethic/p2p/message/get_next_addresses.ex new file mode 100644 index 000000000..87e3e3c22 --- /dev/null +++ b/lib/archethic/p2p/message/get_next_addresses.ex @@ -0,0 +1,46 @@ +defmodule Archethic.P2P.Message.GetNextAddresses do + @moduledoc """ + Inform a shard to start repair. + """ + @enforce_keys [:address] + defstruct [:address] + + alias Archethic.Crypto + + alias Archethic.Utils + + @type t :: %__MODULE__{address: Crypto.prepended_hash()} + + @doc """ + Serialize GetNextAddresses Struct + + iex> %GetNextAddresses{ + ...> address: <<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + ...> 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>> + ...> } |> GetNextAddresses.serialize() + <<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>> + """ + def serialize(%__MODULE__{address: address}) do + <> + end + + @doc """ + Deserialize GetNextAddresses Struct + + iex> <<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + ...> 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>> + ...> |> GetNextAddresses.deserialize() + { + %GetNextAddresses{ + address: <<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>> + }, ""} + + """ + def deserialize(bin) do + {address, rest} = Utils.deserialize_address(bin) + + {%__MODULE__{address: address}, rest} + end +end diff --git a/lib/archethic/p2p/message/shard_repair.ex b/lib/archethic/p2p/message/shard_repair.ex new file mode 100644 index 000000000..e714267b8 --- /dev/null +++ b/lib/archethic/p2p/message/shard_repair.ex @@ -0,0 +1,123 @@ +defmodule Archethic.P2P.Message.ShardRepair do + @moduledoc """ + Inform a shard to start repair. + """ + @enforce_keys [:first_address, :storage_address, :io_addresses] + defstruct [:first_address, :storage_address, :io_addresses] + + alias Archethic.Crypto + + alias Archethic.Utils + alias Archethic.Utils.VarInt + + @type t :: %__MODULE__{ + first_address: Crypto.prepended_hash(), + storage_address: Crypto.prepended_hash(), + io_addresses: list(Crypto.prepended_hash()) + } + + @doc """ + Serialize ShardRepair Struct + + iex> %ShardRepair{ + ...> first_address: <<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + ...> 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>>, + ...> storage_address: <<0, 0, 106, 248, 193, 217, 112, 140, 200, 141, 33, 81, 243, 92, 207, 242, 72, + ...> 2, 92, 236, 236, 100, 121, 250, 105, 12, 90, 240, 221, 108, 1, 171, 108, 130>>, + ...> io_addresses: [ + ...> <<0, 0, 106, 248, 193, 217, 112, 140, 200, 141, 33, 81, 243, 92, 207, 242, 72, + ...> 2, 92, 236, 236, 100, 121, 250, 105, 12, 90, 240, 221, 108, 1, 171, 108, 130>>, + ...> <<0, 0, 106, 248, 193, 217, 112, 140, 200, 141, 33, 81, 243, 92, 207, 242, 72, + ...> 2, 92, 236, 236, 100, 121, 250, 105, 12, 90, 240, 221, 108, 1, 171, 108, 130>> + ...> ] + ...> } |> ShardRepair.serialize() + # First address + <<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251, + # Storage address? + 1::1, + # Storage address + 0, 0, 106, 248, 193, 217, 112, 140, 200, 141, 33, 81, 243, 92, 207, 242, 72, + 2, 92, 236, 236, 100, 121, 250, 105, 12, 90, 240, 221, 108, 1, 171, 108, 130, + # Varint + 1, 2, + #IO addresses + 0, 0, 106, 248, 193, 217, 112, 140, 200, 141, 33, 81, 243, 92, 207, 242, 72, + 2, 92, 236, 236, 100, 121, 250, 105, 12, 90, 240, 221, 108, 1, 171, 108, 130, + 0, 0, 106, 248, 193, 217, 112, 140, 200, 141, 33, 81, 243, 92, 207, 242, 72, + 2, 92, 236, 236, 100, 121, 250, 105, 12, 90, 240, 221, 108, 1, 171, 108, 130>> + """ + def serialize(%__MODULE__{ + first_address: first_address, + storage_address: nil, + io_addresses: io_addresses + }) do + <> + end + + def serialize(%__MODULE__{ + first_address: first_address, + storage_address: storage_address, + io_addresses: io_addresses + }) do + <> + end + + @doc """ + DeSerialize ShardRepair Struct + + iex> # First address + ...> <<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + ...> 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251, + ...> # Storage address? + ...> 1::1, + ...> # Storage address + ...> 0, 0, 106, 248, 193, 217, 112, 140, 200, 141, 33, 81, 243, 92, 207, 242, 72, + ...> 2, 92, 236, 236, 100, 121, 250, 105, 12, 90, 240, 221, 108, 1, 171, 108, 130, + ...> # Varint + ...> 1, 2, + ...> #IO addresses + ...> 0, 0, 106, 248, 193, 217, 112, 140, 200, 141, 33, 81, 243, 92, 207, 242, 72, + ...> 2, 92, 236, 236, 100, 121, 250, 105, 12, 90, 240, 221, 108, 1, 171, 108, 130, + ...> 0, 0, 106, 248, 193, 217, 112, 140, 200, 141, 33, 81, 243, 92, 207, 242, 72, + ...> 2, 92, 236, 236, 100, 121, 250, 105, 12, 90, 240, 221, 108, 1, 171, 108, 130>> + ...> |> ShardRepair.deserialize() + { + %ShardRepair{ + first_address: <<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>>, + storage_address: <<0, 0, 106, 248, 193, 217, 112, 140, 200, 141, 33, 81, 243, 92, 207, 242, 72, + 2, 92, 236, 236, 100, 121, 250, 105, 12, 90, 240, 221, 108, 1, 171, 108, 130>>, + io_addresses: [ + <<0, 0, 106, 248, 193, 217, 112, 140, 200, 141, 33, 81, 243, 92, 207, 242, 72, + 2, 92, 236, 236, 100, 121, 250, 105, 12, 90, 240, 221, 108, 1, 171, 108, 130>>, + <<0, 0, 106, 248, 193, 217, 112, 140, 200, 141, 33, 81, 243, 92, 207, 242, 72, + 2, 92, 236, 236, 100, 121, 250, 105, 12, 90, 240, 221, 108, 1, 171, 108, 130>> + ] + }, ""} + + """ + def deserialize(bin) do + {first_address, <>} = Utils.deserialize_address(bin) + + {storage_address, rest} = + if storage_address? == 1 do + Utils.deserialize_address(rest) + else + {nil, rest} + end + + {io_addresses_length, rest} = VarInt.get_value(rest) + + {io_addresses, rest} = Utils.deserialize_addresses(rest, io_addresses_length, []) + + {%__MODULE__{ + first_address: first_address, + storage_address: storage_address, + io_addresses: io_addresses + }, rest} + end +end diff --git a/lib/archethic/p2p/message/transaction_input_list.ex b/lib/archethic/p2p/message/transaction_input_list.ex index afac9d71c..0da664eee 100644 --- a/lib/archethic/p2p/message/transaction_input_list.ex +++ b/lib/archethic/p2p/message/transaction_input_list.ex @@ -4,10 +4,10 @@ defmodule Archethic.P2P.Message.TransactionInputList do """ defstruct inputs: [], more?: false, offset: 0 - alias Archethic.TransactionChain.TransactionInput + alias Archethic.TransactionChain.VersionedTransactionInput @type t() :: %__MODULE__{ - inputs: list(TransactionInput.t()), + inputs: list(VersionedTransactionInput.t()), more?: boolean(), offset: non_neg_integer() } diff --git a/lib/archethic/self_repair.ex b/lib/archethic/self_repair.ex index 45dd86f8d..020e8a2e3 100755 --- a/lib/archethic/self_repair.ex +++ b/lib/archethic/self_repair.ex @@ -4,11 +4,19 @@ defmodule Archethic.SelfRepair do the bootstrapping phase and stores last synchronization date after each cycle. """ + alias __MODULE__.Notifier + alias __MODULE__.NotifierSupervisor + alias __MODULE__.RepairRegistry + alias __MODULE__.RepairWorker alias __MODULE__.Scheduler alias __MODULE__.Sync alias Archethic.BeaconChain + alias Archethic.Crypto + + alias Archethic.P2P.Node + alias Crontab.CronExpression.Parser, as: CronParser alias Crontab.Scheduler, as: CronScheduler @@ -67,12 +75,6 @@ defmodule Archethic.SelfRepair do @spec put_last_sync_date(DateTime.t()) :: :ok defdelegate put_last_sync_date(datetime), to: Sync, as: :store_last_sync_date - def config_change(changed_conf) do - changed_conf - |> Keyword.get(Scheduler) - |> Scheduler.config_change() - end - @doc """ Return the previous scheduler time from a given date """ @@ -83,4 +85,73 @@ defmodule Archethic.SelfRepair do |> CronScheduler.get_previous_run_date!(DateTime.to_naive(date_from)) |> DateTime.from_naive!("Etc/UTC") end + + @doc """ + Start a new notifier process if there is new unavailable nodes after the self repair + """ + @spec start_notifier(list(Node.t()), list(Node.t()), DateTime.t()) :: :ok + def start_notifier(prev_available_nodes, new_available_nodes, availability_update) do + diff_node = + (prev_available_nodes -- new_available_nodes) + |> Enum.reject(&(&1.first_public_key == Crypto.first_node_public_key())) + + case diff_node do + [] -> + :ok + + nodes -> + unavailable_nodes = Enum.map(nodes, & &1.first_public_key) + + DynamicSupervisor.start_child( + NotifierSupervisor, + {Notifier, + unavailable_nodes: unavailable_nodes, + prev_available_nodes: prev_available_nodes, + new_available_nodes: new_available_nodes, + availability_update: availability_update} + ) + + :ok + end + end + + @doc """ + Return pid of a running RepairWorker for the first_address, or false + """ + @spec repair_in_progress?(first_address :: binary()) :: false | pid() + def repair_in_progress?(first_address) do + case Registry.lookup(RepairRegistry, first_address) do + [{pid, _}] -> + pid + + _ -> + false + end + end + + @doc """ + Start a new RepairWorker for the first_address + """ + @spec start_worker(list()) :: DynamicSupervisor.on_start_child() + def start_worker(args) do + DynamicSupervisor.start_child(NotifierSupervisor, {RepairWorker, args}) + end + + @doc """ + Add a new address in the address list of the RepairWorker + """ + @spec add_repair_addresses( + pid(), + Crypto.prepended_hash() | nil, + list(Crypto.prepended_hash()) + ) :: :ok + def add_repair_addresses(pid, storage_address, io_addresses) do + GenServer.cast(pid, {:add_address, storage_address, io_addresses}) + end + + def config_change(changed_conf) do + changed_conf + |> Keyword.get(Scheduler) + |> Scheduler.config_change() + end end diff --git a/lib/archethic/self_repair/notifier.ex b/lib/archethic/self_repair/notifier.ex index 2463ef910..886727aef 100644 --- a/lib/archethic/self_repair/notifier.ex +++ b/lib/archethic/self_repair/notifier.ex @@ -16,158 +16,295 @@ defmodule Archethic.SelfRepair.Notifier do H -->|Replicate Transaction| C[Node2] H -->|Replicate Transaction| D[Node3] ``` - """ - use GenServer - @vsn Mix.Project.config()[:version] + alias Archethic.{ + BeaconChain, + Crypto, + Election, + P2P, + P2P.Node, + P2P.Message.ShardRepair, + TransactionChain, + TransactionChain.Transaction, + Utils + } - alias Archethic.Crypto + alias Archethic.TransactionChain.Transaction.{ + ValidationStamp, + ValidationStamp.LedgerOperations + } - alias Archethic.Election + use GenServer, restart: :temporary - alias Archethic.PubSub + require Logger - alias Archethic.P2P - alias Archethic.P2P.Message.ReplicateTransaction - alias Archethic.P2P.Node + def start_link(args) do + GenServer.start_link(__MODULE__, args) + end - alias Archethic.TaskSupervisor + def init(args) do + availability_update = Keyword.fetch!(args, :availability_update) - alias Archethic.TransactionChain - alias Archethic.TransactionChain.Transaction - alias Archethic.TransactionChain.Transaction.ValidationStamp + seconds = DateTime.diff(availability_update, DateTime.utc_now()) - alias Archethic.Utils + if seconds > 0 do + Process.send_after(self(), :start, seconds * 1000) + else + send(self(), :start) + end - require Logger + {:ok, args} + end - def start_link(args \\ []) do - GenServer.start_link(__MODULE__, args) + def handle_info(:start, data) do + unavailable_nodes = Keyword.fetch!(data, :unavailable_nodes) + prev_available_nodes = Keyword.fetch!(data, :prev_available_nodes) + new_available_nodes = Keyword.fetch!(data, :new_available_nodes) + + Logger.info( + "Start Notifier due to a topology change #{inspect(Enum.map(unavailable_nodes, &Base.encode16(&1)))}" + ) + + repair_transactions(unavailable_nodes, prev_available_nodes, new_available_nodes) + repair_summaries_aggregate(prev_available_nodes, new_available_nodes) + + {:stop, :normal, data} end - def init(_) do - PubSub.register_to_node_update() - {:ok, %{notified: %{}}} + @doc """ + For each txn chain in db. Load its genesis address, load its + chain, recompute shards , notifiy nodes. Network txns are excluded. + """ + @spec repair_transactions(list(Crypto.key()), list(Node.t()), list(Node.t())) :: :ok + def repair_transactions(unavailable_nodes, prev_available_nodes, new_available_nodes) do + # We fetch all the transactions existing and check if the disconnected nodes were in storage nodes + TransactionChain.stream_first_addresses() + |> Stream.reject(&network_chain?(&1)) + |> Stream.chunk_every(20) + |> Stream.each(fn chunk -> + concurrent_txn_processing( + chunk, + unavailable_nodes, + prev_available_nodes, + new_available_nodes + ) + end) + |> Stream.run() end - def handle_info( - {:node_update, - %Node{ - available?: false, - authorized?: true, - first_public_key: node_key, - authorization_date: authorization_date - }}, - state = %{notified: notified} - ) do - current_node_public_key = Crypto.first_node_public_key() - now = DateTime.utc_now() |> DateTime.truncate(:millisecond) - - with :lt <- DateTime.compare(authorization_date, now), - nil <- Map.get(notified, node_key), - false <- current_node_public_key == node_key do - repair_transactions(node_key, current_node_public_key) - {:noreply, Map.update!(state, :notified, &Map.put(&1, node_key, %{}))} - else + defp network_chain?(address) do + case TransactionChain.get_transaction(address, [:type]) do + {:ok, %Transaction{type: type}} -> + Transaction.network_type?(type) + _ -> - {:noreply, state} + false end end - def handle_info( - {:node_update, - %Node{authorized?: false, authorization_date: date, first_public_key: node_key}}, - state = %{notified: notified} - ) - when date != nil do - current_node_public_key = Crypto.first_node_public_key() + defp concurrent_txn_processing( + addresses, + unavailable_nodes, + prev_available_nodes, + new_available_nodes + ) do + Task.Supervisor.async_stream_nolink( + Archethic.TaskSupervisor, + addresses, + &sync_chain(&1, unavailable_nodes, prev_available_nodes, new_available_nodes), + ordered: false, + on_timeout: :kill_task + ) + |> Stream.run() + end - with nil <- Map.get(notified, node_key), - false <- current_node_public_key == node_key do - repair_transactions(node_key, current_node_public_key) - {:noreply, Map.update!(state, :notified, &Map.put(&1, node_key, %{}))} - else - _ -> - {:noreply, state} - end + defp sync_chain(address, unavailable_nodes, prev_available_nodes, new_available_nodes) do + address + |> TransactionChain.stream([ + :address, + validation_stamp: [ledger_operations: [:transaction_movements]] + ]) + |> Stream.map(&get_previous_election(&1, prev_available_nodes)) + |> Stream.filter(&storage_or_io_node?(&1, unavailable_nodes)) + |> Stream.filter(¬ify?(&1)) + |> Stream.map(&new_storage_nodes(&1, new_available_nodes)) + |> map_last_addresses_for_node() + |> notify_nodes(address) end - def handle_info( - {:node_update, - %Node{available?: true, first_public_key: node_key, authorization_date: date}}, - state - ) - when date != nil do - {:noreply, Map.update!(state, :notified, &Map.delete(&1, node_key))} + defp get_previous_election( + %Transaction{ + address: address, + validation_stamp: %ValidationStamp{ + ledger_operations: %LedgerOperations{transaction_movements: transaction_movements} + } + }, + prev_available_nodes + ) do + prev_storage_nodes = + Election.chain_storage_nodes(address, prev_available_nodes) + |> Enum.map(& &1.first_public_key) + + resolved_addresses = + transaction_movements + |> Enum.map(& &1.to) + + prev_io_nodes = + resolved_addresses + |> Election.io_storage_nodes(prev_available_nodes) + |> Enum.map(& &1.first_public_key) + + {address, resolved_addresses, prev_storage_nodes, prev_io_nodes -- prev_storage_nodes} end - def handle_info(_, state) do - {:noreply, state} + defp storage_or_io_node?({_, _, prev_storage_nodes, prev_io_nodes}, unavailable_nodes) do + nodes = prev_storage_nodes ++ prev_io_nodes + Enum.any?(unavailable_nodes, &Enum.member?(nodes, &1)) end - defp repair_transactions(node_key, current_node_public_key) do - Logger.debug("Trying to repair transactions due to a topology change", - node: Base.encode16(node_key) - ) + # Notify only if the current node is part of the previous storage / io nodes + # to reduce number of messages + defp notify?({_, _, prev_storage_nodes, prev_io_nodes}) do + Enum.member?(prev_storage_nodes ++ prev_io_nodes, Crypto.first_node_public_key()) + end - node_key - |> get_transactions_to_sync() - |> Stream.each(&forward_transaction(&1, current_node_public_key)) - |> Stream.run() + @doc """ + New election is carried out on the set of all authorized omiting unavailable_node. + The set of previous storage nodes is subtracted from the set of new storage nodes. + """ + @spec new_storage_nodes( + {binary(), list(Crypto.prepended_hash()), list(Crypto.key()), list(Crypto.key())}, + list(Node.t()) + ) :: + {binary(), list(Crypto.key()), list(Crypto.key())} + def new_storage_nodes( + {address, resolved_addresses, prev_storage_nodes, prev_io_nodes}, + new_available_nodes + ) do + new_storage_nodes = + Election.chain_storage_nodes(address, new_available_nodes) + |> Enum.map(& &1.first_public_key) + |> Enum.reject(&Enum.member?(prev_storage_nodes, &1)) + + already_stored_nodes = prev_storage_nodes ++ prev_io_nodes ++ new_storage_nodes + + new_io_nodes = + resolved_addresses + |> Election.io_storage_nodes(new_available_nodes) + |> Enum.map(& &1.first_public_key) + |> Enum.reject(&Enum.member?(already_stored_nodes, &1)) + + {address, new_storage_nodes, new_io_nodes} end - defp get_transactions_to_sync(node_public_key) do - # We fetch all the transactions existing and check if the disconnecting node was a storage node - TransactionChain.list_all([:address, :type, validation_stamp: [:timestamp]]) - |> Stream.map( - fn tx = %Transaction{ - address: address, - type: type, - validation_stamp: %ValidationStamp{timestamp: timestamp} - } -> - node_list = - Enum.filter( - P2P.list_nodes(), - &(&1.authorization_date != nil and - DateTime.compare(&1.authorization_date, timestamp) == :lt) - ) + @doc """ + Create a map returning for each node the last transaction address it should replicate + """ + @spec map_last_addresses_for_node(Enumerable.t()) :: Enumerable.t() + def map_last_addresses_for_node(stream) do + Enum.reduce( + stream, + %{}, + fn {address, new_storage_nodes, new_io_nodes}, acc -> + acc = + Enum.reduce(new_storage_nodes, acc, fn first_public_key, acc -> + Map.update( + acc, + first_public_key, + %{last_address: address, io_addresses: []}, + &Map.put(&1, :last_address, address) + ) + end) - {tx, Election.chain_storage_nodes_with_type(address, type, node_list)} + Enum.reduce(new_io_nodes, acc, fn first_public_key, acc -> + Map.update( + acc, + first_public_key, + %{last_address: nil, io_addresses: [address]}, + &Map.update(&1, :io_addresses, [address], fn addresses -> [address | addresses] end) + ) + end) end ) - |> Stream.filter(fn {_tx, nodes} -> - Utils.key_in_node_list?(nodes, node_public_key) - end) end - defp forward_transaction( - {tx = %Transaction{address: address, type: type}, previous_storage_nodes}, - current_node_public_key - ) do - # We compute the new storage nodes minus the previous ones - new_storage_nodes = - Election.chain_storage_nodes_with_type( - address, - type, - P2P.authorized_nodes() -- previous_storage_nodes - ) + defp notify_nodes(acc, first_address) do + Task.Supervisor.async_stream_nolink( + Archethic.TaskSupervisor, + acc, + fn {node_first_public_key, %{last_address: last_address, io_addresses: io_addresses}} -> + Logger.info( + "Send Shard Repair message to #{Base.encode16(node_first_public_key)}" <> + "with storage_address #{if last_address, do: Base.encode16(last_address), else: nil}, " <> + "io_addresses #{inspect(Enum.map(io_addresses, &Base.encode16(&1)))}", + address: Base.encode16(first_address) + ) - with false <- Enum.empty?(new_storage_nodes), - true <- Utils.key_in_node_list?(previous_storage_nodes, current_node_public_key) do - Logger.info("Repair started due to network topology change", - transaction_address: Base.encode16(address), - transaction_type: type - ) + P2P.send_message(node_first_public_key, %ShardRepair{ + first_address: first_address, + storage_address: last_address, + io_addresses: io_addresses + }) + end, + ordered: false, + on_timeout: :kill_task + ) + |> Stream.run() + end + @doc """ + For each beacon aggregate, calculate the new election and store it if the node needs to + """ + @spec repair_summaries_aggregate(list(Node.t()), list(Node.t())) :: :ok + def repair_summaries_aggregate(prev_available_nodes, new_available_nodes) do + %Node{enrollment_date: first_enrollment_date} = P2P.get_first_enrolled_node() + + first_enrollment_date + |> BeaconChain.next_summary_dates() + |> Stream.filter(&download?(&1, new_available_nodes)) + |> Stream.chunk_every(20) + |> Stream.each(fn summary_times -> Task.Supervisor.async_stream_nolink( - TaskSupervisor, - new_storage_nodes, - &P2P.send_message(&1, %ReplicateTransaction{transaction: tx}), + Archethic.TaskSupervisor, + summary_times, + &download_and_store_summary(&1, prev_available_nodes), ordered: false, on_timeout: :kill_task ) |> Stream.run() + end) + |> Stream.run() + end + + defp download?(summary_time, new_available_nodes) do + in_new_election? = + summary_time + |> Crypto.derive_beacon_aggregate_address() + |> Election.chain_storage_nodes(new_available_nodes) + |> Utils.key_in_node_list?(Crypto.first_node_public_key()) + + if in_new_election? do + case BeaconChain.get_summaries_aggregate(summary_time) do + {:ok, _} -> false + {:error, _} -> true + end + else + false + end + end + + defp download_and_store_summary(summary_time, prev_available_nodes) do + case BeaconChain.fetch_summaries_aggregate(summary_time, prev_available_nodes) do + {:ok, aggregate} -> + Logger.debug("Notifier store beacon aggregate for #{summary_time}") + BeaconChain.write_summaries_aggregate(aggregate) + + error -> + Logger.warning( + "Notifier cannot fetch summary aggregate for date #{summary_time} " <> + "because of #{inspect(error)}" + ) end end end diff --git a/lib/archethic/self_repair/repair_worker.ex b/lib/archethic/self_repair/repair_worker.ex new file mode 100644 index 000000000..d2113bd33 --- /dev/null +++ b/lib/archethic/self_repair/repair_worker.ex @@ -0,0 +1,184 @@ +defmodule Archethic.SelfRepair.RepairWorker do + @moduledoc false + + alias Archethic.{ + Contracts, + BeaconChain, + Election, + P2P, + Replication, + TransactionChain + } + + alias Archethic.SelfRepair.RepairRegistry + + use GenServer, restart: :transient + + require Logger + + def start_link(args) do + GenServer.start_link(__MODULE__, args, []) + end + + def init(args) do + first_address = Keyword.fetch!(args, :first_address) + storage_address = Keyword.fetch!(args, :storage_address) + io_addresses = Keyword.fetch!(args, :io_addresses) + + Registry.register(RepairRegistry, first_address, []) + + Logger.info( + "Notifier Repair Worker start with storage_address #{if storage_address, do: Base.encode16(storage_address), else: nil}, " <> + "io_addresses #{inspect(Enum.map(io_addresses, &Base.encode16(&1)))}", + address: Base.encode16(first_address) + ) + + # We get the authorized nodes before the last summary date as we are sure that they know + # the informations we need. Requesting current nodes may ask information to nodes in same repair + # process as we are here. + authorized_nodes = + DateTime.utc_now() + |> BeaconChain.previous_summary_time() + |> P2P.authorized_and_available_nodes(true) + + storage_addresses = if storage_address != nil, do: [storage_address], else: [] + + data = %{ + storage_addresses: storage_addresses, + io_addresses: io_addresses, + authorized_nodes: authorized_nodes + } + + {:ok, start_repair(data)} + end + + def handle_cast({:add_address, storage_address, io_addresses}, data) do + new_data = + if storage_address != nil, + do: Map.update!(data, :storage_addresses, &([storage_address | &1] |> Enum.uniq())), + else: data + + new_data = + if io_addresses != [], + do: Map.update!(new_data, :io_addresses, &((&1 ++ io_addresses) |> Enum.uniq())), + else: new_data + + {:noreply, new_data} + end + + def handle_info( + {:DOWN, _ref, :process, pid, _normal}, + data = %{task: task_pid, storage_addresses: [], io_addresses: []} + ) + when pid == task_pid do + {:stop, :normal, data} + end + + def handle_info( + {:DOWN, _ref, :process, pid, _normal}, + data = %{task: task_pid} + ) + when pid == task_pid do + {:noreply, start_repair(data)} + end + + def handle_info(_, data), do: {:noreply, data} + + defp start_repair( + data = %{ + storage_addresses: [], + io_addresses: [address | rest], + authorized_nodes: authorized_nodes + } + ) do + pid = repair_task(address, false, authorized_nodes) + + data + |> Map.put(:io_addresses, rest) + |> Map.put(:task, pid) + end + + defp start_repair( + data = %{ + storage_addresses: [address | rest], + authorized_nodes: authorized_nodes + } + ) do + pid = repair_task(address, true, authorized_nodes) + + data + |> Map.put(:storage_addresses, rest) + |> Map.put(:task, pid) + end + + defp repair_task(address, storage?, authorized_nodes) do + %Task{pid: pid} = + Task.async(fn -> + replicate_transaction(address, storage?, authorized_nodes) + end) + + pid + end + + defp replicate_transaction(address, storage?, authorized_nodes) do + Logger.debug("Notifier RepairWorker start replication, storage? #{storage?}", + address: Base.encode16(address) + ) + + with false <- TransactionChain.transaction_exists?(address), + storage_nodes <- Election.chain_storage_nodes(address, authorized_nodes), + {:ok, tx} <- TransactionChain.fetch_transaction_remotely(address, storage_nodes) do + if storage? do + case Replication.validate_and_store_transaction_chain(tx, true, authorized_nodes) do + :ok -> update_last_address(address, authorized_nodes) + error -> error + end + else + Replication.validate_and_store_transaction(tx, true) + end + else + true -> + Logger.debug("Notifier RepairWorker transaction already exists", + address: Base.encode16(address) + ) + + {:error, reason} -> + Logger.warning( + "Notifier RepairWorker failed to replicate transaction because of #{inspect(reason)}" + ) + end + end + + @doc """ + Request missing transaction addresses from last local address until last chain address + and add them in the DB + """ + def update_last_address(address, authorized_nodes) do + # As the node is storage node of this chain, it needs to know all the addresses of the chain until the last + # So we get the local last address and verify if it's the same as the last address of the chain + # by requesting the nodes which already know the last address + + {last_local_address, _timestamp} = TransactionChain.get_last_address(address) + storage_nodes = Election.storage_nodes(last_local_address, authorized_nodes) + + case TransactionChain.fetch_next_chain_addresses_remotely(last_local_address, storage_nodes) do + {:ok, []} -> + :ok + + {:ok, addresses} -> + genesis_address = TransactionChain.get_genesis_address(address) + + addresses + |> Enum.sort_by(fn {_address, timestamp} -> timestamp end) + |> Enum.each(fn {address, timestamp} -> + TransactionChain.register_last_address(genesis_address, address, timestamp) + end) + + # Stop potential previous smart contract + Contracts.stop_contract(address) + + _ -> + :ok + end + end +end diff --git a/lib/archethic/self_repair/supervisor.ex b/lib/archethic/self_repair/supervisor.ex index bfc0efbbd..cf6188fc9 100644 --- a/lib/archethic/self_repair/supervisor.ex +++ b/lib/archethic/self_repair/supervisor.ex @@ -3,9 +3,7 @@ defmodule Archethic.SelfRepair.Supervisor do use Supervisor - alias Archethic.SelfRepair.Notifier alias Archethic.SelfRepair.Scheduler - alias Archethic.Utils def start_link(arg) do @@ -15,7 +13,11 @@ defmodule Archethic.SelfRepair.Supervisor do def init(_arg) do children = [ {Scheduler, Application.get_env(:archethic, Scheduler)}, - Notifier + {DynamicSupervisor, strategy: :one_for_one, name: Archethic.SelfRepair.NotifierSupervisor}, + {Registry, + name: Archethic.SelfRepair.RepairRegistry, + keys: :unique, + partitions: System.schedulers_online()} ] Supervisor.init(Utils.configurable_children(children), strategy: :one_for_one) diff --git a/lib/archethic/self_repair/sync.ex b/lib/archethic/self_repair/sync.ex index ff4bfa495..bdf8738a7 100644 --- a/lib/archethic/self_repair/sync.ex +++ b/lib/archethic/self_repair/sync.ex @@ -25,6 +25,8 @@ defmodule Archethic.SelfRepair.Sync do alias Archethic.TransactionChain.TransactionSummary + alias Archethic.SelfRepair + alias Archethic.Utils require Logger @@ -123,13 +125,13 @@ defmodule Archethic.SelfRepair.Sync do defp do_load_missed_transactions(last_sync_date, last_summary_time) do start = System.monotonic_time() - download_nodes = P2P.authorized_and_available_nodes() + download_nodes = P2P.authorized_and_available_nodes(last_summary_time, true) summaries_aggregates = fetch_summaries_aggregates(last_sync_date, last_summary_time, download_nodes) last_aggregate = BeaconChain.fetch_and_aggregate_summaries(last_summary_time, download_nodes) - ensure_download_last_aggregate(last_aggregate) + ensure_download_last_aggregate(last_aggregate, download_nodes) last_aggregate = aggregate_with_local_summaries(last_aggregate, last_summary_time) @@ -166,20 +168,15 @@ defmodule Archethic.SelfRepair.Sync do |> Stream.map(fn {:ok, {:ok, aggregate}} -> aggregate end) end - defp ensure_download_last_aggregate( - last_aggregate = %SummaryAggregate{summary_time: summary_time} - ) do + defp ensure_download_last_aggregate(last_aggregate, download_nodes) do # Make sure the last beacon aggregate have been synchronized # from remote nodes to avoid self-repair to be acknowledged if those # cannot be reached - - nodes = P2P.authorized_and_available_nodes(summary_time) - # If number of authorized node is <= 2 and current node is part of it # we accept the self repair as the other node may be unavailable and so # we need to do the self even if no other node respond with true <- P2P.authorized_node?(), - true <- length(nodes) <= 2 do + true <- length(download_nodes) <= 2 do :ok else _ -> @@ -248,6 +245,8 @@ defmodule Archethic.SelfRepair.Sync do availability_update = DateTime.add(summary_time, availability_adding_time) + previous_available_nodes = P2P.authorized_and_available_nodes() + p2p_availabilities |> Enum.reduce(%{}, fn {subset, %{ @@ -269,9 +268,19 @@ defmodule Archethic.SelfRepair.Sync do |> Enum.map(&update_availabilities(&1, availability_update)) |> DB.register_p2p_summary() + new_available_nodes = P2P.authorized_and_available_nodes(availability_update) + + if :persistent_term.get(:archethic_up, nil) == :up do + SelfRepair.start_notifier( + previous_available_nodes, + new_available_nodes, + availability_update + ) + end + update_statistics(summary_time, transaction_summaries) - store_aggregate(aggregate) + store_aggregate(aggregate, new_available_nodes) end defp synchronize_transactions([], _), do: :ok @@ -378,9 +387,11 @@ defmodule Archethic.SelfRepair.Sync do PubSub.notify_new_tps(tps, nb_transactions) end - defp store_aggregate(aggregate = %SummaryAggregate{summary_time: summary_time}) do - node_list = - [P2P.get_node_info() | P2P.authorized_and_available_nodes()] |> P2P.distinct_nodes() + defp store_aggregate( + aggregate = %SummaryAggregate{summary_time: summary_time}, + new_available_nodes + ) do + node_list = [P2P.get_node_info() | new_available_nodes] |> P2P.distinct_nodes() should_store? = summary_time diff --git a/lib/archethic/transaction_chain.ex b/lib/archethic/transaction_chain.ex index 1742146f8..ee6ec16b6 100644 --- a/lib/archethic/transaction_chain.ex +++ b/lib/archethic/transaction_chain.ex @@ -10,25 +10,27 @@ defmodule Archethic.TransactionChain do alias Archethic.Election alias Archethic.P2P - alias Archethic.P2P.Node alias Archethic.P2P.Message + alias Archethic.P2P.Node alias Archethic.P2P.Message.{ + AddressList, Error, - NotFound, - TransactionList, - UnspentOutputList, - TransactionInputList, - TransactionChainLength, - LastTransactionAddress, FirstAddress, - GetTransaction, GetFirstAddress, - GetUnspentOutputs, + GetLastTransactionAddress, + GetNextAddresses, + GetTransaction, GetTransactionChain, + GetTransactionChainLength, GetTransactionInputs, - GetLastTransactionAddress, - GetTransactionChainLength + GetUnspentOutputs, + LastTransactionAddress, + NotFound, + TransactionChainLength, + TransactionInputList, + TransactionList, + UnspentOutputList } alias __MODULE__.MemTables.KOLedger @@ -78,7 +80,7 @@ defmodule Archethic.TransactionChain do @doc """ Stream all the addresses in chronological belonging to a genesis address """ - @spec list_chain_addresses(binary()) :: Enumerable.t() | list({binary(), non_neg_integer()}) + @spec list_chain_addresses(binary()) :: Enumerable.t() | list({binary(), DateTime.t()}) defdelegate list_chain_addresses(genesis_address), to: DB @doc """ @@ -98,10 +100,10 @@ defmodule Archethic.TransactionChain do as: :get_last_chain_address @doc """ - Register a last address from a previous address at a given date + Register a last address from a genesis address at a given date """ @spec register_last_address(binary(), binary(), DateTime.t()) :: :ok - defdelegate register_last_address(previous_address, next_address, timestamp), + defdelegate register_last_address(genesis_address, next_address, timestamp), to: DB, as: :add_last_transaction_address @@ -111,6 +113,16 @@ defmodule Archethic.TransactionChain do @spec get_first_public_key(Crypto.key()) :: Crypto.key() defdelegate get_first_public_key(previous_public_key), to: DB, as: :get_first_public_key + @doc """ + Stream first transactions address of a chain from genesis_address. + The Genesis Addresses is not a transaction or the first transaction. + The first transaction is calulated by index = 0+1 + """ + @spec stream_first_addresses() :: Enumerable.t() + defdelegate stream_first_addresses(), + to: DB, + as: :stream_first_addresses + @doc """ Get a transaction @@ -589,6 +601,29 @@ defmodule Archethic.TransactionChain do end end + @doc """ + Request the chain addresses from paging address to last chain address + """ + @spec fetch_next_chain_addresses_remotely(Crypto.prepended_hash(), list(Node.t())) :: + {:ok, list(Crypto.prepended_hash())} | {:error, :network_issue} + def fetch_next_chain_addresses_remotely(address, nodes) do + conflict_resolver = fn results -> + Enum.sort_by(results, &length(&1.addresses), :desc) |> List.first() + end + + case P2P.quorum_read( + nodes, + %GetNextAddresses{address: address}, + conflict_resolver + ) do + {:ok, %AddressList{addresses: addresses}} -> + {:ok, addresses} + + {:error, :network_issue} = e -> + e + end + end + @doc """ Get a transaction summary from a transaction address """ diff --git a/lib/archethic/utils/regression/benchmarks/end_to_end_validation.ex b/lib/archethic/utils/regression/benchmarks/end_to_end_validation.ex index 67034fe1c..32ec97dbe 100644 --- a/lib/archethic/utils/regression/benchmarks/end_to_end_validation.ex +++ b/lib/archethic/utils/regression/benchmarks/end_to_end_validation.ex @@ -16,7 +16,6 @@ defmodule Archethic.Utils.Regression.Benchmark.EndToEndValidation do alias Archethic.TransactionChain.TransactionData.UCOLedger.Transfer, as: UCOTransfer @behaviour Benchmark - def plan([host | _nodes], _opts) do port = Application.get_env(:archethic, ArchethicWeb.Endpoint)[:http][:port] diff --git a/lib/archethic_web/live/chains/beacon_live.ex b/lib/archethic_web/live/chains/beacon_live.ex index a073765a8..373fbb03a 100644 --- a/lib/archethic_web/live/chains/beacon_live.ex +++ b/lib/archethic_web/live/chains/beacon_live.ex @@ -233,7 +233,9 @@ defmodule ArchethicWeb.BeaconChainLive do defp list_transactions_from_summaries(nil), do: [] defp list_transactions_from_aggregate(date = %DateTime{}) do - case BeaconChain.get_summaries_aggregate(date) do + nodes = P2P.authorized_and_available_nodes() + + case BeaconChain.fetch_summaries_aggregate(date, nodes) do {:ok, %SummaryAggregate{transaction_summaries: tx_summaries}} -> Enum.sort_by(tx_summaries, & &1.timestamp, {:desc, DateTime}) diff --git a/lib/archethic_web/live/chains/node_shared_secrets_live.ex b/lib/archethic_web/live/chains/node_shared_secrets_live.ex index be99f34fc..7aa9e5b2c 100644 --- a/lib/archethic_web/live/chains/node_shared_secrets_live.ex +++ b/lib/archethic_web/live/chains/node_shared_secrets_live.ex @@ -153,7 +153,7 @@ defmodule ArchethicWeb.NodeSharedSecretsChainLive do display_data( addr, nb_authorized_nodes, - DateTime.from_unix(timestamp, :millisecond) |> elem(1) + timestamp ) end) |> Enum.reverse() diff --git a/lib/archethic_web/live/chains/reward_live.ex b/lib/archethic_web/live/chains/reward_live.ex index f46942d97..a6b41c122 100644 --- a/lib/archethic_web/live/chains/reward_live.ex +++ b/lib/archethic_web/live/chains/reward_live.ex @@ -132,7 +132,7 @@ defmodule ArchethicWeb.RewardChainLive do display_data( addr, (TransactionChain.get_transaction(addr, [:type]) |> elem(1)).type, - DateTime.from_unix(timestamp, :millisecond) |> elem(1) + timestamp ) end) |> Enum.reverse() diff --git a/test/archethic/p2p/message/address_list_test.exs b/test/archethic/p2p/message/address_list_test.exs new file mode 100644 index 000000000..ad2f699cf --- /dev/null +++ b/test/archethic/p2p/message/address_list_test.exs @@ -0,0 +1,7 @@ +defmodule Archethic.P2P.Message.AddressListTest do + @moduledoc false + use ExUnit.Case + + alias Archethic.P2P.Message.AddressList + doctest AddressList +end diff --git a/test/archethic/p2p/message/get_next_addresses_test.exs b/test/archethic/p2p/message/get_next_addresses_test.exs new file mode 100644 index 000000000..9df5e01fa --- /dev/null +++ b/test/archethic/p2p/message/get_next_addresses_test.exs @@ -0,0 +1,7 @@ +defmodule Archethic.P2P.Message.GetNextAddressesTest do + @moduledoc false + use ExUnit.Case + + alias Archethic.P2P.Message.GetNextAddresses + doctest GetNextAddresses +end diff --git a/test/archethic/p2p/message/shard_repair_test.exs b/test/archethic/p2p/message/shard_repair_test.exs new file mode 100644 index 000000000..3da2a3520 --- /dev/null +++ b/test/archethic/p2p/message/shard_repair_test.exs @@ -0,0 +1,7 @@ +defmodule Archethic.P2P.Message.ShardRepairTest do + @moduledoc false + use ExUnit.Case + + alias Archethic.P2P.Message.ShardRepair + doctest ShardRepair +end diff --git a/test/archethic/p2p/messages_test.exs b/test/archethic/p2p/messages_test.exs index dd3d1ca7a..5e9fd8536 100644 --- a/test/archethic/p2p/messages_test.exs +++ b/test/archethic/p2p/messages_test.exs @@ -1,64 +1,68 @@ defmodule Archethic.P2P.MessageTest do use ArchethicCase - alias Archethic.Crypto - - alias Archethic.P2P.Message - alias Archethic.P2P.Message.AcknowledgeStorage - alias Archethic.P2P.Message.AddMiningContext - alias Archethic.P2P.Message.Balance - alias Archethic.P2P.Message.BootstrappingNodes - alias Archethic.P2P.Message.CrossValidate - alias Archethic.P2P.Message.CrossValidationDone - alias Archethic.P2P.Message.EncryptedStorageNonce - alias Archethic.P2P.Message.Error - alias Archethic.P2P.Message.FirstPublicKey - alias Archethic.P2P.Message.GetBalance - alias Archethic.P2P.Message.GetBootstrappingNodes - alias Archethic.P2P.Message.GetFirstPublicKey - alias Archethic.P2P.Message.GetLastTransaction - alias Archethic.P2P.Message.GetLastTransactionAddress - alias Archethic.P2P.Message.GetP2PView - alias Archethic.P2P.Message.GetStorageNonce - alias Archethic.P2P.Message.GetTransaction - alias Archethic.P2P.Message.GetTransactionChain - alias Archethic.P2P.Message.GetTransactionChainLength - alias Archethic.P2P.Message.GetTransactionInputs - alias Archethic.P2P.Message.GetTransactionSummary - alias Archethic.P2P.Message.GetUnspentOutputs - alias Archethic.P2P.Message.LastTransactionAddress - alias Archethic.P2P.Message.ListNodes - alias Archethic.P2P.Message.NewTransaction - alias Archethic.P2P.Message.NodeList - alias Archethic.P2P.Message.NotFound - alias Archethic.P2P.Message.NotifyEndOfNodeSync - alias Archethic.P2P.Message.NotifyLastTransactionAddress - alias Archethic.P2P.Message.NotifyPreviousChain - alias Archethic.P2P.Message.Ok - alias Archethic.P2P.Message.P2PView - alias Archethic.P2P.Message.Ping - alias Archethic.P2P.Message.RegisterBeaconUpdates - alias Archethic.P2P.Message.ReplicateTransaction - alias Archethic.P2P.Message.ReplicateTransactionChain - alias Archethic.P2P.Message.StartMining - alias Archethic.P2P.Message.StartMining - alias Archethic.P2P.Message.TransactionChainLength - alias Archethic.P2P.Message.TransactionInputList - alias Archethic.P2P.Message.TransactionList - alias Archethic.P2P.Message.UnspentOutputList - alias Archethic.P2P.Node - - alias Archethic.TransactionChain.Transaction - alias Archethic.TransactionChain.Transaction.CrossValidationStamp - alias Archethic.TransactionChain.Transaction.ValidationStamp - alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations - alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.UnspentOutput - - alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.VersionedUnspentOutput - - alias Archethic.TransactionChain.TransactionData - alias Archethic.TransactionChain.TransactionInput - alias Archethic.TransactionChain.VersionedTransactionInput + alias Archethic.{ + Crypto, + P2P.Message, + P2P.Node + } + + alias Archethic.P2P.Message.{ + AcknowledgeStorage, + AddMiningContext, + Balance, + BootstrappingNodes, + CrossValidate, + CrossValidationDone, + EncryptedStorageNonce, + Error, + FirstPublicKey, + GetBalance, + GetBootstrappingNodes, + GetFirstPublicKey, + GetLastTransaction, + GetLastTransactionAddress, + GetP2PView, + GetStorageNonce, + GetTransaction, + GetTransactionChain, + GetTransactionChainLength, + GetTransactionInputs, + GetTransactionSummary, + GetUnspentOutputs, + LastTransactionAddress, + ListNodes, + NewTransaction, + NodeList, + NotFound, + NotifyEndOfNodeSync, + NotifyLastTransactionAddress, + NotifyPreviousChain, + Ok, + P2PView, + Ping, + RegisterBeaconUpdates, + ReplicateTransaction, + ReplicateTransactionChain, + StartMining, + ShardRepair, + TransactionChainLength, + TransactionInputList, + TransactionList, + UnspentOutputList + } + + alias Archethic.TransactionChain.{ + TransactionData, + TransactionInput, + VersionedTransactionInput, + Transaction, + Transaction.CrossValidationStamp, + Transaction.ValidationStamp, + Transaction.ValidationStamp.LedgerOperations, + Transaction.ValidationStamp.LedgerOperations.UnspentOutput, + Transaction.ValidationStamp.LedgerOperations.VersionedUnspentOutput + } doctest Message @@ -980,6 +984,23 @@ defmodule Archethic.P2P.MessageTest do |> Message.decode() |> elem(0) end + + test "%ShardRepair" do + msg = %ShardRepair{ + first_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>, + storage_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>, + io_addresses: [ + <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>, + <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> + ] + } + + assert msg == + msg + |> Message.encode() + |> Message.decode() + |> elem(0) + end end test "get_timeout should return timeout according to message type" do diff --git a/test/archethic/replication_test.exs b/test/archethic/replication_test.exs index a62e14be7..fa95916df 100644 --- a/test/archethic/replication_test.exs +++ b/test/archethic/replication_test.exs @@ -255,7 +255,7 @@ defmodule Archethic.ReplicationTest do |> expect(:get_transaction, fn _, _ -> {:ok, %Transaction{validation_stamp: %ValidationStamp{timestamp: DateTime.utc_now()}}} end) - |> expect(:list_chain_addresses, fn _ -> [{"@Alice1", 0}] end) + |> expect(:list_chain_addresses, fn _ -> [{"@Alice1", DateTime.utc_now()}] end) MockClient |> stub(:send_message, fn _, diff --git a/test/archethic/self_repair/notifier_test.exs b/test/archethic/self_repair/notifier_test.exs index 5b073b839..9c0b65e3d 100644 --- a/test/archethic/self_repair/notifier_test.exs +++ b/test/archethic/self_repair/notifier_test.exs @@ -1,88 +1,259 @@ defmodule Archethic.SelfRepair.NotifierTest do + @moduledoc false use ArchethicCase + import Mox + + alias Archethic.BeaconChain.SummaryAggregate + alias Archethic.BeaconChain.SummaryTimer + alias Archethic.Crypto + alias Archethic.Election + alias Archethic.P2P + alias Archethic.P2P.Message.GetBeaconSummariesAggregate alias Archethic.P2P.Message.Ok - alias Archethic.P2P.Message.ReplicateTransaction + alias Archethic.P2P.Message.ShardRepair alias Archethic.P2P.Node alias Archethic.SelfRepair.Notifier + alias Archethic.TransactionChain.Transaction alias Archethic.TransactionChain.Transaction.ValidationStamp + alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations - import Mox + alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations - test "when a node is becoming offline new nodes should receive transaction to replicate" do - P2P.add_and_connect_node(%Node{ - first_public_key: Crypto.first_node_public_key(), - last_public_key: Crypto.first_node_public_key(), - ip: {127, 0, 0, 1}, - port: 3000, + test "new_storage_nodes/2 should return new election" do + node1 = %Node{ + first_public_key: "node1", + last_public_key: "node1", authorized?: true, - authorization_date: ~U[2022-02-01 00:00:00Z], + authorization_date: DateTime.utc_now(), + available?: true, geo_patch: "AAA" - }) + } - P2P.add_and_connect_node(%Node{ + node2 = %Node{ first_public_key: "node2", last_public_key: "node2", - ip: {127, 0, 0, 1}, - port: 3001, authorized?: true, - authorization_date: ~U[2022-02-01 00:00:00Z], - geo_patch: "CCC" - }) + authorization_date: DateTime.utc_now(), + available?: true, + geo_patch: "AAA" + } - P2P.add_and_connect_node(%Node{ + node3 = %Node{ first_public_key: "node3", last_public_key: "node3", - ip: {127, 0, 0, 1}, - port: 3002, authorized?: true, - authorization_date: ~U[2022-02-03 00:00:00Z], - geo_patch: "DDD" - }) + authorization_date: DateTime.utc_now(), + available?: true, + geo_patch: "AAA" + } - {:ok, pid} = Notifier.start_link() + node4 = %Node{ + first_public_key: "node4", + last_public_key: "node4", + authorized?: true, + authorization_date: DateTime.utc_now(), + available?: true, + geo_patch: "AAA" + } - MockDB - |> expect(:list_transactions, fn _ -> - [ - %Transaction{ - address: "@Alice1", - type: :transfer, - validation_stamp: %ValidationStamp{ - timestamp: ~U[2022-02-01 12:54:00Z] - } + P2P.add_and_connect_node(node1) + P2P.add_and_connect_node(node2) + P2P.add_and_connect_node(node3) + P2P.add_and_connect_node(node4) + + prev_storage_nodes = ["node2", "node3"] + new_available_nodes = [node1, node2, node3, node4] + + assert {"Alice1", ["node4", "node1"], []} = + Notifier.new_storage_nodes( + {"Alice1", [], prev_storage_nodes, []}, + new_available_nodes + ) + end + + test "map_last_address_for_node/1 should create a map with last address for each node" do + tab = [ + {"Alice1", ["node1"], ["node3"]}, + {"Alice2", [], ["node1"]}, + {"Alice3", ["node1"], ["node3"]}, + {"Alice4", ["node4"], ["node2"]}, + {"Alice5", ["node3"], []} + ] + + expected = %{ + "node1" => %{last_address: "Alice3", io_addresses: ["Alice2"]}, + "node2" => %{last_address: nil, io_addresses: ["Alice4"]}, + "node3" => %{last_address: "Alice5", io_addresses: ["Alice3", "Alice1"]}, + "node4" => %{last_address: "Alice4", io_addresses: []} + } + + assert ^expected = Notifier.map_last_addresses_for_node(tab) + end + + test "repair_transactions/3 should send message to new storage nodes" do + node = %Node{ + first_public_key: Crypto.first_node_public_key(), + last_public_key: Crypto.last_node_public_key(), + authorized?: true, + authorization_date: DateTime.utc_now(), + available?: true, + geo_patch: "AAA" + } + + P2P.add_and_connect_node(node) + + prev_available_nodes = + Enum.map(1..50, fn nb -> + node = %Node{ + first_public_key: "node#{nb}", + last_public_key: "node#{nb}", + authorized?: true, + authorization_date: DateTime.utc_now(), + available?: true, + geo_patch: "#{Integer.to_string(nb, 16)}A" } - ] + + P2P.add_and_connect_node(node) + + node + end) + + prev_available_nodes = [node | prev_available_nodes] + + # Take nodes in election of Alice2 but not in the one of Alice3 + elec1 = Election.chain_storage_nodes("Alice2", prev_available_nodes) + elec2 = Election.chain_storage_nodes("Alice3", prev_available_nodes) + + diff_nodes = elec1 -- elec2 + + unavailable_nodes = Enum.take(diff_nodes, 2) |> Enum.map(& &1.first_public_key) + + new_available_nodes = + Enum.reject(prev_available_nodes, &(&1.first_public_key in unavailable_nodes)) + + # New possible storage nodes for Alice2 + new_possible_nodes = (prev_available_nodes -- elec1) |> Enum.map(& &1.first_public_key) + + MockDB + |> stub(:stream_first_addresses, fn -> ["Alice1"] end) + |> stub(:get_transaction_chain, fn + "Alice1", _, _ -> + {[ + %Transaction{ + address: "Alice1", + validation_stamp: %ValidationStamp{ + ledger_operations: %LedgerOperations{transaction_movements: []} + } + }, + %Transaction{ + address: "Alice2", + validation_stamp: %ValidationStamp{ + ledger_operations: %LedgerOperations{transaction_movements: []} + } + }, + %Transaction{ + address: "Alice3", + validation_stamp: %ValidationStamp{ + ledger_operations: %LedgerOperations{transaction_movements: []} + } + } + ], false, nil} end) me = self() MockClient - |> expect(:send_message, fn %Node{first_public_key: "node3"}, - %ReplicateTransaction{ - transaction: %Transaction{address: "@Alice1"} - }, - _ -> - send(me, :tx_replicated) - %Ok{} + |> stub(:send_message, fn + node, %ShardRepair{first_address: "Alice1", storage_address: "Alice2"}, _ -> + if Enum.member?(new_possible_nodes, node.first_public_key) do + send(me, :new_node) + end + + _, _, _ -> + {:ok, %Ok{}} end) - send( - pid, - {:node_update, - %Node{ - first_public_key: "node2", - available?: false, - authorized?: true, - authorization_date: ~U[2022-02-01 00:00:00Z] - }} - ) - - assert_receive :tx_replicated + Notifier.repair_transactions(unavailable_nodes, prev_available_nodes, new_available_nodes) + + # Expect to receive only 1 new node for Alice2 + assert_receive :new_node + refute_receive :new_node, 200 + end + + test "repair_summaries_aggregate/2 should store beacon aggregate" do + enrollment_date = DateTime.utc_now() |> DateTime.add(-10, :minute) + + node = %Node{ + first_public_key: Crypto.first_node_public_key(), + last_public_key: Crypto.last_node_public_key(), + geo_patch: "AAA", + network_patch: "AAA", + authorized?: true, + authorization_date: DateTime.utc_now() |> DateTime.add(-11, :minute), + available?: true, + enrollment_date: enrollment_date, + availability_history: <<1::1>> + } + + P2P.add_and_connect_node(node) + + nodes = + Enum.map(1..9, fn nb -> + %Node{ + first_public_key: "node#{nb}", + last_public_key: "node#{nb}", + geo_patch: "AAA", + network_patch: "AAA", + authorized?: true, + authorization_date: DateTime.utc_now(), + available?: true, + availability_history: <<1::1>> + } + end) + + nodes = [node | nodes] + + start_supervised!({SummaryTimer, interval: "0 * * * *"}) + + [first_date | rest] = SummaryTimer.next_summaries(enrollment_date) |> Enum.to_list() + random_date = Enum.random(rest) + + me = self() + + MockDB + |> stub(:get_beacon_summaries_aggregate, fn + summary_time when summary_time in [first_date, random_date] -> + {:error, :not_exists} + + summary_time -> + {:ok, %SummaryAggregate{summary_time: summary_time}} + end) + |> expect(:write_beacon_summaries_aggregate, 2, fn + %SummaryAggregate{summary_time: summary_time} when summary_time == first_date -> + send(me, :write_first_date) + + %SummaryAggregate{summary_time: summary_time} when summary_time == random_date -> + send(me, :write_random_date) + + _ -> + send(me, :unexpected) + end) + + MockClient + |> stub(:send_message, fn _, %GetBeaconSummariesAggregate{date: summary_time}, _ -> + {:ok, %SummaryAggregate{summary_time: summary_time}} + end) + + Notifier.repair_summaries_aggregate(nodes, nodes) + + assert_receive :write_first_date + assert_receive :write_random_date + refute_receive :unexpected end end diff --git a/test/archethic/self_repair/repair_worker_test.exs b/test/archethic/self_repair/repair_worker_test.exs new file mode 100644 index 000000000..487520ae2 --- /dev/null +++ b/test/archethic/self_repair/repair_worker_test.exs @@ -0,0 +1,164 @@ +defmodule Archethic.SelfRepair.RepairWorkerTest do + @moduledoc false + use ArchethicCase + + alias Archethic.BeaconChain.SummaryTimer + + alias Archethic.Crypto + + alias Archethic.P2P + alias Archethic.P2P.Client.DefaultImpl + alias Archethic.P2P.Node + alias Archethic.P2P.Message.GetNextAddresses + alias Archethic.P2P.Message.GetTransaction + + alias Archethic.SelfRepair.RepairWorker + + alias Archethic.TransactionChain.Transaction + alias Archethic.TransactionChain.Transaction.ValidationStamp + + import Mox + + setup do + start_supervised!({SummaryTimer, interval: "0 0 * * *"}) + + :ok + end + + test "start_link/1 should start a new worker and create a task to replicate transaction" do + {:ok, pid} = + RepairWorker.start_link( + first_address: "Alice1", + storage_address: "Alice2", + io_addresses: ["Bob1"] + ) + + assert %{storage_addresses: [], io_addresses: ["Bob1"], task: _task_pid} = :sys.get_state(pid) + end + + test "repair_task/3 replicate a transaction if it does not already exists" do + P2P.add_and_connect_node(%Node{ + first_public_key: "node1", + last_public_key: "node1", + geo_patch: "AAA", + authorized?: true, + authorization_date: ~U[2022-11-27 00:00:00Z], + available?: true + }) + + {:ok, pid} = + RepairWorker.start_link( + first_address: "Alice1", + storage_address: "Alice2", + io_addresses: ["Bob1", "Bob2"] + ) + + me = self() + + MockDB + |> stub(:transaction_exists?, fn + "Bob2" -> + send(me, :exists_bob3) + true + + _ -> + false + end) + + MockClient + |> stub(:send_message, fn + _, %GetTransaction{address: "Alice2"}, _ -> + send(me, :get_tx_alice2) + + _, %GetTransaction{address: "Bob1"}, _ -> + send(me, :get_tx_bob1) + + _, %GetTransaction{address: "Bob2"}, _ -> + send(me, :get_tx_bob2) + end) + + assert_receive :get_tx_alice2 + assert_receive :get_tx_bob1 + + assert_receive :exists_bob3 + refute_receive :get_tx_bob2 + + assert not Process.alive?(pid) + end + + test "add_message/1 should add new addresses in GenServer state" do + MockDB + |> stub(:transaction_exists?, fn _ -> Process.sleep(100) end) + + {:ok, pid} = + RepairWorker.start_link( + first_address: "Alice1", + storage_address: "Alice2", + io_addresses: ["Bob1", "Bob2"] + ) + + assert %{ + storage_addresses: [], + io_addresses: ["Bob1", "Bob2"], + task: _task_pid + } = :sys.get_state(pid) + + GenServer.cast(pid, {:add_address, "Alice4", ["Bob2", "Bob3"]}) + GenServer.cast(pid, {:add_address, "Alice3", []}) + GenServer.cast(pid, {:add_address, nil, ["Bob4"]}) + + assert %{ + storage_addresses: ["Alice3", "Alice4"], + io_addresses: ["Bob1", "Bob2", "Bob3", "Bob4"], + task: _task_pid + } = :sys.get_state(pid) + end + + test "update_last_address/1 should request missing addresses and add them in DB" do + node = %Node{ + first_public_key: Crypto.first_node_public_key(), + last_public_key: Crypto.last_node_public_key(), + geo_patch: "AAA", + authorized?: true, + authorization_date: ~U[2022-11-27 00:00:00Z], + available?: true, + availability_history: <<1::1>> + } + + me = self() + + MockDB + |> expect(:get_last_chain_address, fn "Alice2" -> {"Alice2", ~U[2022-11-27 00:10:00Z]} end) + |> expect(:get_transaction, fn "Alice2", _ -> + {:ok, %Transaction{validation_stamp: %ValidationStamp{timestamp: ~U[2022-11-27 00:10:00Z]}}} + end) + |> expect(:get_first_chain_address, 2, fn "Alice2" -> "Alice0" end) + |> expect(:list_chain_addresses, fn "Alice0" -> + [ + {"Alice1", ~U[2022-11-27 00:09:00Z]}, + {"Alice2", ~U[2022-11-27 00:10:00Z]}, + {"Alice3", ~U[2022-11-27 00:11:00Z]}, + {"Alice4", ~U[2022-11-27 00:12:00Z]} + ] + end) + |> expect(:add_last_transaction_address, 2, fn + "Alice0", "Alice3", ~U[2022-11-27 00:11:00Z] -> + send(me, :add_alice3) + + "Alice0", "Alice4", ~U[2022-11-27 00:12:00Z] -> + send(me, :add_alice4) + end) + + MockClient + |> expect(:send_message, fn node, msg = %GetNextAddresses{address: "Alice2"}, timeout -> + send(me, :get_next_addresses) + DefaultImpl.send_message(node, msg, timeout) + end) + + RepairWorker.update_last_address("Alice2", [node]) + + assert_receive :get_next_addresses + assert_receive :add_alice3 + assert_receive :add_alice4 + end +end diff --git a/test/archethic_web/live/rewards_live_test.exs b/test/archethic_web/live/rewards_live_test.exs index 639957eb7..4d64158eb 100644 --- a/test/archethic_web/live/rewards_live_test.exs +++ b/test/archethic_web/live/rewards_live_test.exs @@ -37,7 +37,6 @@ defmodule ArchethicWeb.RewardsLiveTest do time = DateTime.utc_now() |> DateTime.add(3600 * index, :second) - |> DateTime.to_unix() {address, time} end)