diff --git a/lib/archethic.ex b/lib/archethic.ex index 1855a229c..058fee74e 100644 --- a/lib/archethic.ex +++ b/lib/archethic.ex @@ -11,6 +11,7 @@ defmodule Archethic do alias __MODULE__.Mining alias __MODULE__.P2P + alias __MODULE__.P2P.Node alias __MODULE__.DB @@ -19,7 +20,6 @@ defmodule Archethic do alias __MODULE__.P2P.Message.NewTransaction alias __MODULE__.P2P.Message.Ok alias __MODULE__.P2P.Message.StartMining - alias __MODULE__.P2P.Node alias __MODULE__.TransactionChain alias __MODULE__.TransactionChain.Transaction @@ -37,13 +37,7 @@ defmodule Archethic do | {:error, :network_issue} def search_transaction(address) when is_binary(address) do storage_nodes = Election.chain_storage_nodes(address, P2P.authorized_and_available_nodes()) - - nodes = - storage_nodes - |> P2P.nearest_nodes() - |> Enum.filter(&Node.locally_available?/1) - - TransactionChain.fetch_transaction_remotely(address, nodes) + TransactionChain.fetch_transaction_remotely(address, storage_nodes) end @doc """ @@ -116,12 +110,7 @@ defmodule Archethic do def get_last_transaction(address) when is_binary(address) do case get_last_transaction_address(address) do {:ok, last_address} -> - nodes = - last_address - |> Election.chain_storage_nodes(P2P.authorized_and_available_nodes()) - |> P2P.nearest_nodes() - |> Enum.filter(&Node.locally_available?/1) - + nodes = Election.chain_storage_nodes(last_address, P2P.authorized_and_available_nodes()) TransactionChain.fetch_transaction_remotely(last_address, nodes) {:error, :network_issue} = e -> @@ -146,8 +135,6 @@ defmodule Archethic do def get_balance(address) when is_binary(address) do address |> Election.chain_storage_nodes(P2P.authorized_and_available_nodes()) - |> P2P.nearest_nodes() - |> Enum.filter(&Node.locally_available?/1) |> get_balance(address) end @@ -189,11 +176,7 @@ defmodule Archethic do end defp do_get_transaction_inputs(address) do - nodes = - address - |> Election.chain_storage_nodes(P2P.authorized_and_available_nodes()) - |> P2P.nearest_nodes() - |> Enum.filter(&Node.locally_available?/1) + nodes = Election.chain_storage_nodes(address, P2P.authorized_and_available_nodes()) address |> TransactionChain.stream_inputs_remotely(nodes, DateTime.utc_now()) @@ -211,11 +194,7 @@ defmodule Archethic do def get_transaction_inputs(address, page, limit) when is_binary(address) and is_integer(page) and page >= 0 and is_integer(limit) and limit >= 0 do - nodes = - address - |> Election.chain_storage_nodes(P2P.authorized_and_available_nodes()) - |> P2P.nearest_nodes() - |> Enum.filter(&Node.locally_available?/1) + nodes = Election.chain_storage_nodes(address, P2P.authorized_and_available_nodes()) {inputs, _more?, _offset} = TransactionChain.fetch_inputs_remotely(address, nodes, DateTime.utc_now(), page, limit) @@ -228,11 +207,7 @@ defmodule Archethic do """ @spec get_transaction_chain(binary()) :: {:ok, list(Transaction.t())} | {:error, :network_issue} def get_transaction_chain(address) when is_binary(address) do - nodes = - address - |> Election.chain_storage_nodes(P2P.authorized_and_available_nodes()) - |> P2P.nearest_nodes() - |> Enum.filter(&Node.locally_available?/1) + nodes = Election.chain_storage_nodes(address, P2P.authorized_and_available_nodes()) # We directly check if the transaction exists and retrieve the genesis # Otherwise we are requesting the genesis address remotly @@ -290,11 +265,7 @@ defmodule Archethic do @spec get_transaction_chain_by_paging_address(binary(), binary()) :: {:ok, list(Transaction.t())} | {:error, :network_issue} def get_transaction_chain_by_paging_address(address, paging_address) when is_binary(address) do - nodes = - address - |> Election.chain_storage_nodes(P2P.authorized_and_available_nodes()) - |> P2P.nearest_nodes() - |> Enum.filter(&Node.locally_available?/1) + nodes = Election.chain_storage_nodes(address, P2P.authorized_and_available_nodes()) try do {local_chain, paging_address} = @@ -331,12 +302,7 @@ defmodule Archethic do @spec get_transaction_chain_length(binary()) :: {:ok, non_neg_integer()} | {:error, :network_issue} def get_transaction_chain_length(address) when is_binary(address) do - nodes = - address - |> Election.chain_storage_nodes(P2P.authorized_and_available_nodes()) - |> P2P.nearest_nodes() - |> Enum.filter(&Node.locally_available?/1) - + nodes = Election.chain_storage_nodes(address, P2P.authorized_and_available_nodes()) TransactionChain.fetch_size_remotely(address, nodes) end end diff --git a/lib/archethic/bootstrap.ex b/lib/archethic/bootstrap.ex index ea258764a..e67aaf1bb 100644 --- a/lib/archethic/bootstrap.ex +++ b/lib/archethic/bootstrap.ex @@ -178,7 +178,7 @@ defmodule Archethic.Bootstrap do reward_address ) - post_bootstrap(patch: network_patch, sync?: true) + post_bootstrap(sync?: true) else if Sync.require_update?(ip, port, http_port, transport, last_sync_date) do Logger.info("Update node chain...") @@ -193,9 +193,9 @@ defmodule Archethic.Bootstrap do reward_address ) - post_bootstrap(patch: network_patch, sync?: true) + post_bootstrap(sync?: true) else - post_bootstrap(patch: network_patch, sync?: false) + post_bootstrap(sync?: false) end end end @@ -203,10 +203,8 @@ defmodule Archethic.Bootstrap do defp post_bootstrap(opts) do if Keyword.get(opts, :sync?, true) do - patch = Keyword.fetch!(opts, :patch) - Logger.info("Synchronization started") - :ok = SelfRepair.bootstrap_sync(SelfRepair.last_sync_date(), patch) + :ok = SelfRepair.bootstrap_sync(SelfRepair.last_sync_date()) Logger.info("Synchronization finished") end diff --git a/lib/archethic/mining/transaction_context.ex b/lib/archethic/mining/transaction_context.ex index 2895e5405..e6e9c76c4 100644 --- a/lib/archethic/mining/transaction_context.ex +++ b/lib/archethic/mining/transaction_context.ex @@ -88,14 +88,8 @@ defmodule Archethic.Mining.TransactionContext do end defp previous_nodes_distribution(previous_address, nb_sub_lists, sample_size) do - node_list = - P2P.unprioritize_node(P2P.authorized_and_available_nodes(), Crypto.first_node_public_key()) - elected_nodes = - previous_address - |> Election.chain_storage_nodes(node_list) - |> P2P.nearest_nodes() - |> Enum.filter(&Node.locally_available?/1) + Election.chain_storage_nodes(previous_address, P2P.authorized_and_available_nodes()) if Enum.count(elected_nodes) < sample_size do Enum.map(1..nb_sub_lists, fn _ -> elected_nodes end) diff --git a/lib/archethic/p2p.ex b/lib/archethic/p2p.ex index 7144c89a2..3dfbe8fe0 100644 --- a/lib/archethic/p2p.ex +++ b/lib/archethic/p2p.ex @@ -573,7 +573,11 @@ defmodule Archethic.P2P do ) def quorum_read(nodes, message, conflict_resolver, timeout, consistency_level) do - do_quorum_read(nodes, message, conflict_resolver, timeout, consistency_level, nil) + nodes + |> Enum.filter(&Node.locally_available?/1) + |> nearest_nodes() + |> unprioritize_node(Crypto.first_node_public_key()) + |> do_quorum_read(message, conflict_resolver, timeout, consistency_level, nil) end defp do_quorum_read([], _, _, _, _, nil), do: {:error, :network_issue} diff --git a/lib/archethic/replication/transaction_context.ex b/lib/archethic/replication/transaction_context.ex index bd04f7d45..e6714a1fd 100644 --- a/lib/archethic/replication/transaction_context.ex +++ b/lib/archethic/replication/transaction_context.ex @@ -5,9 +5,6 @@ defmodule Archethic.Replication.TransactionContext do alias Archethic.Election - alias Archethic.P2P - alias Archethic.P2P.Node - alias Archethic.TransactionChain alias Archethic.TransactionChain.Transaction alias Archethic.TransactionChain.TransactionInput @@ -19,10 +16,10 @@ defmodule Archethic.Replication.TransactionContext do """ @spec fetch_transaction(address :: Crypto.versioned_hash(), list(Node.t())) :: Transaction.t() | nil - def fetch_transaction(address, download_nodes) when is_binary(address) do - nodes = replication_nodes(address, download_nodes) + def fetch_transaction(address, node_list) when is_binary(address) do + storage_nodes = Election.chain_storage_nodes(address, node_list) - case TransactionChain.fetch_transaction_remotely(address, nodes) do + case TransactionChain.fetch_transaction_remotely(address, storage_nodes) do {:ok, tx} -> tx @@ -36,16 +33,17 @@ defmodule Archethic.Replication.TransactionContext do """ @spec stream_transaction_chain(address :: Crypto.versioned_hash(), list(Node.t())) :: Enumerable.t() | list(Transaction.t()) - def stream_transaction_chain(address, download_nodes) when is_binary(address) do - case replication_nodes(address, download_nodes) do + def stream_transaction_chain(address, node_list) when is_binary(address) do + storage_nodes = Election.chain_storage_nodes(address, node_list) + paging_address = TransactionChain.get_last_local_address(address) + + case storage_nodes do [] -> [] - nodes -> - paging_address = TransactionChain.get_last_local_address(address) - + _ -> if paging_address != address do - TransactionChain.stream_remotely(address, nodes, paging_address) + TransactionChain.stream_remotely(address, storage_nodes, paging_address) else [] end @@ -57,27 +55,12 @@ defmodule Archethic.Replication.TransactionContext do """ @spec fetch_transaction_inputs(address :: Crypto.versioned_hash(), DateTime.t(), list(Node.t())) :: list(TransactionInput.t()) - def fetch_transaction_inputs(address, timestamp = %DateTime{}, download_nodes) + def fetch_transaction_inputs(address, timestamp = %DateTime{}, node_list) when is_binary(address) do - nodes = replication_nodes(address, download_nodes) + storage_nodes = Election.chain_storage_nodes(address, node_list) address - |> TransactionChain.stream_inputs_remotely(nodes, timestamp) + |> TransactionChain.stream_inputs_remotely(storage_nodes, timestamp) |> Enum.to_list() end - - defp replication_nodes(address, download_nodes) do - address - # returns the storage nodes for the transaction chain based on the transaction address - # from a list of available node - |> Election.chain_storage_nodes(download_nodes) - # Returns the nearest storages nodes from the local node as per the patch - # when the input is a list of nodes - |> P2P.nearest_nodes() - # Determine if the node is locally available based on its availability history. - # If the last exchange with node was succeed the node is considered as available - |> Enum.filter(&Node.locally_available?/1) - # Reorder a list of nodes to ensure the current node is only called at the end - |> P2P.unprioritize_node(Crypto.first_node_public_key()) - end end diff --git a/lib/archethic/self_repair.ex b/lib/archethic/self_repair.ex index 61287dd38..45dd86f8d 100755 --- a/lib/archethic/self_repair.ex +++ b/lib/archethic/self_repair.ex @@ -23,8 +23,8 @@ defmodule Archethic.SelfRepair do @doc """ Start the bootstrap's synchronization process using the last synchronization date """ - @spec bootstrap_sync(last_sync_date :: DateTime.t(), network_patch :: binary()) :: :ok - def bootstrap_sync(date = %DateTime{}, patch) when is_binary(patch) do + @spec bootstrap_sync(last_sync_date :: DateTime.t()) :: :ok + def bootstrap_sync(date = %DateTime{}) do # Loading transactions can take a lot of time to be achieve and can overpass an epoch. # So to avoid missing a beacon summary epoch, we save the starting date and update the last sync date with it # at the end of loading (in case there is a crash during self repair). @@ -36,7 +36,7 @@ defmodule Archethic.SelfRepair do # as no data have been aggregated if DateTime.diff(DateTime.utc_now(), summary_time) >= 0 do start_date = DateTime.utc_now() - :ok = Sync.load_missed_transactions(date, patch) + :ok = Sync.load_missed_transactions(date) put_last_sync_date(start_date) # At the end of self repair, if a new beacon summary as been created @@ -45,7 +45,7 @@ defmodule Archethic.SelfRepair do |> BeaconChain.previous_summary_time() |> DateTime.compare(start_date) do :gt -> - bootstrap_sync(start_date, patch) + bootstrap_sync(start_date) _ -> :ok diff --git a/lib/archethic/self_repair/scheduler.ex b/lib/archethic/self_repair/scheduler.ex index 98e60a34f..4e8f2114e 100644 --- a/lib/archethic/self_repair/scheduler.ex +++ b/lib/archethic/self_repair/scheduler.ex @@ -7,7 +7,6 @@ defmodule Archethic.SelfRepair.Scheduler do @vsn Mix.Project.config()[:version] alias Archethic.P2P - alias Archethic.P2P.Node alias Archethic.SelfRepair.Sync @@ -94,7 +93,7 @@ defmodule Archethic.SelfRepair.Scheduler do # So to avoid missing a beacon summary epoch, we save the starting date and update the last sync date with it # at the end of loading (in case there is a crash during self repair) start_date = DateTime.utc_now() - :ok = Sync.load_missed_transactions(last_sync_date, get_node_patch()) + :ok = Sync.load_missed_transactions(last_sync_date) {:ok, start_date} end) @@ -136,11 +135,6 @@ defmodule Archethic.SelfRepair.Scheduler do end end - defp get_node_patch do - %Node{network_patch: network_patch} = P2P.get_node_info() - network_patch - end - defp update_last_sync_date(date = %DateTime{}) do next_sync_date = Utils.truncate_datetime(date) :ok = Sync.store_last_sync_date(next_sync_date) diff --git a/lib/archethic/self_repair/sync.ex b/lib/archethic/self_repair/sync.ex index 9062fbd1f..5d2e8b82c 100644 --- a/lib/archethic/self_repair/sync.ex +++ b/lib/archethic/self_repair/sync.ex @@ -101,11 +101,9 @@ defmodule Archethic.SelfRepair.Sync do Once retrieved, the transactions are downloaded and stored if not exists locally """ - @spec load_missed_transactions( - last_sync_date :: DateTime.t(), - patch :: binary() - ) :: :ok | {:error, :unreachable_nodes} - def load_missed_transactions(last_sync_date = %DateTime{}, patch) when is_binary(patch) do + @spec load_missed_transactions(last_sync_date :: DateTime.t()) :: + :ok | {:error, :unreachable_nodes} + def load_missed_transactions(last_sync_date = %DateTime{}) do last_summary_time = BeaconChain.previous_summary_time(DateTime.utc_now()) if DateTime.compare(last_summary_time, last_sync_date) == :gt do @@ -113,7 +111,7 @@ defmodule Archethic.SelfRepair.Sync do "Fetch missed transactions from last sync date: #{DateTime.to_string(last_sync_date)}" ) - do_load_missed_transactions(last_sync_date, last_summary_time, patch) + do_load_missed_transactions(last_sync_date, last_summary_time) else Logger.info("Already synchronized for #{DateTime.to_string(last_sync_date)}") @@ -122,7 +120,7 @@ defmodule Archethic.SelfRepair.Sync do end end - defp do_load_missed_transactions(last_sync_date, last_summary_time, patch) do + defp do_load_missed_transactions(last_sync_date, last_summary_time) do start = System.monotonic_time() download_nodes = P2P.authorized_and_available_nodes() @@ -137,7 +135,7 @@ defmodule Archethic.SelfRepair.Sync do summaries_aggregates |> Stream.concat([last_aggregate]) - |> Enum.each(&process_summary_aggregate(&1, patch, download_nodes)) + |> Enum.each(&process_summary_aggregate(&1, download_nodes)) :telemetry.execute([:archethic, :self_repair], %{duration: System.monotonic_time() - start}) Archethic.Bootstrap.NetworkConstraints.persist_genesis_address() @@ -176,7 +174,8 @@ defmodule Archethic.SelfRepair.Sync do # cannot be reached nodes = - P2P.authorized_nodes(summary_time) + summary_time + |> P2P.authorized_nodes() |> Enum.filter(& &1.available?) # If number of authorized node is <= 2 and current node is part of it @@ -225,14 +224,13 @@ defmodule Archethic.SelfRepair.Sync do At the end of the execution, the summaries aggregate will persisted locally if the node is member of the shard of the summary """ - @spec process_summary_aggregate(SummaryAggregate.t(), binary(), list(Node.t())) :: :ok + @spec process_summary_aggregate(SummaryAggregate.t(), list(Node.t())) :: :ok def process_summary_aggregate( aggregate = %SummaryAggregate{ summary_time: summary_time, transaction_summaries: transaction_summaries, p2p_availabilities: p2p_availabilities }, - node_patch, download_nodes ) do start_time = System.monotonic_time() @@ -242,7 +240,7 @@ defmodule Archethic.SelfRepair.Sync do |> Enum.reject(&TransactionChain.transaction_exists?(&1.address)) |> Enum.filter(&TransactionHandler.download_transaction?/1) - synchronize_transactions(transactions_to_sync, node_patch, download_nodes) + synchronize_transactions(transactions_to_sync, download_nodes) :telemetry.execute( [:archethic, :self_repair, :process_aggregate], @@ -275,16 +273,16 @@ defmodule Archethic.SelfRepair.Sync do store_aggregate(aggregate) end - defp synchronize_transactions([], _node_patch, _), do: :ok + defp synchronize_transactions([], _), do: :ok - defp synchronize_transactions(transaction_summaries, node_patch, download_nodes) do + defp synchronize_transactions(transaction_summaries, download_nodes) do Logger.info("Need to synchronize #{Enum.count(transaction_summaries)} transactions") Logger.debug("Transaction to sync #{inspect(transaction_summaries)}") Task.Supervisor.async_stream( TaskSupervisor, transaction_summaries, - &TransactionHandler.download_transaction(&1, node_patch, download_nodes), + &TransactionHandler.download_transaction(&1, download_nodes), on_timeout: :kill_task, timeout: Message.get_max_timeout() + 2000, max_concurrency: 100 diff --git a/lib/archethic/self_repair/sync/transaction_handler.ex b/lib/archethic/self_repair/sync/transaction_handler.ex index acaadfdaf..81198032b 100644 --- a/lib/archethic/self_repair/sync/transaction_handler.ex +++ b/lib/archethic/self_repair/sync/transaction_handler.ex @@ -6,7 +6,6 @@ defmodule Archethic.SelfRepair.Sync.TransactionHandler do alias Archethic.Election alias Archethic.P2P - alias Archethic.P2P.Node alias Archethic.Replication @@ -51,14 +50,12 @@ defmodule Archethic.SelfRepair.Sync.TransactionHandler do @doc """ Request the transaction for the closest storage nodes and replicate it locally. """ - @spec download_transaction(TransactionSummary.t(), patch :: binary(), list(Node.t())) :: + @spec download_transaction(TransactionSummary.t(), list(Node.t())) :: Transaction.t() def download_transaction( %TransactionSummary{address: address, type: type, timestamp: _timestamp}, - node_patch, - download_nodes - ) - when is_binary(node_patch) do + node_list + ) do Logger.info("Synchronize missed transaction", transaction_address: Base.encode16(address), transaction_type: type @@ -66,10 +63,8 @@ defmodule Archethic.SelfRepair.Sync.TransactionHandler do storage_nodes = address - |> Election.chain_storage_nodes_with_type(type, download_nodes) + |> Election.chain_storage_nodes_with_type(type, node_list) |> Enum.reject(&(&1.first_public_key == Crypto.first_node_public_key())) - |> P2P.nearest_nodes() - |> Enum.filter(&Node.locally_available?/1) case TransactionChain.fetch_transaction_remotely(address, storage_nodes) do {:ok, tx = %Transaction{}} -> @@ -100,13 +95,13 @@ defmodule Archethic.SelfRepair.Sync.TransactionHandler do address: address, type: type }, - download_nodes + node_list ) do - node_list = [P2P.get_node_info() | download_nodes] |> P2P.distinct_nodes() + node_list = [P2P.get_node_info() | node_list] |> P2P.distinct_nodes() cond do Election.chain_storage_node?(address, type, Crypto.first_node_public_key(), node_list) -> - Replication.validate_and_store_transaction_chain(tx, true, download_nodes) + Replication.validate_and_store_transaction_chain(tx, true, node_list) Election.io_storage_node?(tx, Crypto.first_node_public_key(), node_list) -> Replication.validate_and_store_transaction(tx, true) diff --git a/lib/archethic/transaction_chain.ex b/lib/archethic/transaction_chain.ex index 65014bf20..a6d00ef8f 100644 --- a/lib/archethic/transaction_chain.ex +++ b/lib/archethic/transaction_chain.ex @@ -554,11 +554,7 @@ defmodule Archethic.TransactionChain do @spec resolve_last_address(binary(), DateTime.t()) :: {:ok, binary()} | {:error, :network_issue} def resolve_last_address(address, timestamp = %DateTime{} \\ DateTime.utc_now()) when is_binary(address) do - nodes = - address - |> Election.chain_storage_nodes(P2P.authorized_and_available_nodes()) - |> P2P.nearest_nodes() - |> Enum.filter(&Node.locally_available?/1) + nodes = Election.chain_storage_nodes(address, P2P.authorized_and_available_nodes()) case fetch_last_address_remotely(address, nodes, timestamp) do {:ok, last_address} -> @@ -930,11 +926,7 @@ defmodule Archethic.TransactionChain do @spec fetch_genesis_address_remotely(address :: binary()) :: {:ok, binary()} | {:error, :network_issue} def fetch_genesis_address_remotely(address) when is_binary(address) do - nodes = - address - |> Election.chain_storage_nodes(P2P.authorized_and_available_nodes()) - |> P2P.nearest_nodes() - |> Enum.filter(&Node.locally_available?/1) + nodes = Election.chain_storage_nodes(address, P2P.authorized_and_available_nodes()) case P2P.quorum_read(nodes, %GetFirstAddress{address: address}) do {:ok, %FirstAddress{address: genesis_address}} -> diff --git a/lib/archethic_web/controllers/api/transaction_controller.ex b/lib/archethic_web/controllers/api/transaction_controller.ex index f49512b6a..3cf6f258b 100644 --- a/lib/archethic_web/controllers/api/transaction_controller.ex +++ b/lib/archethic_web/controllers/api/transaction_controller.ex @@ -18,7 +18,6 @@ defmodule ArchethicWeb.API.TransactionController do NotFound } - alias Archethic.P2P.Node alias Archethic.Election alias Archethic.Mining @@ -43,13 +42,8 @@ defmodule ArchethicWeb.API.TransactionController do storage_nodes = Election.chain_storage_nodes(tx_address, P2P.authorized_and_available_nodes()) - nodes = - storage_nodes - |> P2P.nearest_nodes() - |> Enum.filter(&Node.locally_available?/1) - case P2P.quorum_read( - nodes, + storage_nodes, %GetTransactionSummary{address: tx_address} ) do {:ok, %TransactionSummary{address: ^tx_address}} -> diff --git a/lib/archethic_web/live/chains/beacon_live.ex b/lib/archethic_web/live/chains/beacon_live.ex index 8ed5b04ec..a073765a8 100644 --- a/lib/archethic_web/live/chains/beacon_live.ex +++ b/lib/archethic_web/live/chains/beacon_live.ex @@ -247,8 +247,6 @@ defmodule ArchethicWeb.BeaconChainLive do # Slots which are already has been added # Real time transaction can be get from pubsub def list_transactions_from_current_slots(date = %DateTime{} \\ DateTime.utc_now()) do - %Node{network_patch: patch} = P2P.get_node_info() - authorized_nodes = P2P.authorized_and_available_nodes() ref_time = DateTime.truncate(date, :millisecond) @@ -262,7 +260,7 @@ defmodule ArchethicWeb.BeaconChainLive do subset |> Election.beacon_storage_nodes(next_summary_date, authorized_nodes) |> Enum.filter(&Node.locally_available?/1) - |> P2P.nearest_nodes(patch) + |> P2P.nearest_nodes() |> Enum.take(3) |> Enum.map(&{&1, subset}) end) @@ -272,9 +270,9 @@ defmodule ArchethicWeb.BeaconChainLive do # We aggregate the subsets for a given node Map.update(acc, node, [subset], &[subset | &1]) end) - |> Flow.flat_map(fn {node, addresses} -> + |> Flow.flat_map(fn {node, subsets} -> # For this node we fetch the summaries - fetch_summaries(node, addresses) + fetch_summaries(node, subsets) end) |> Stream.uniq_by(& &1.address) |> Enum.sort_by(& &1.timestamp, {:desc, DateTime}) diff --git a/lib/archethic_web/live/top_transactions_component.ex b/lib/archethic_web/live/top_transactions_component.ex index 37134b41d..647a9957c 100644 --- a/lib/archethic_web/live/top_transactions_component.ex +++ b/lib/archethic_web/live/top_transactions_component.ex @@ -97,8 +97,6 @@ defmodule ArchethicWeb.ExplorerIndexLive.TopTransactionsComponent do end defp list_transactions_from_current_slots(date = %DateTime{} \\ DateTime.utc_now()) do - %Node{network_patch: patch} = P2P.get_node_info() - authorized_nodes = P2P.authorized_and_available_nodes() ref_time = DateTime.truncate(date, :millisecond) @@ -111,7 +109,7 @@ defmodule ArchethicWeb.ExplorerIndexLive.TopTransactionsComponent do subset |> Election.beacon_storage_nodes(next_summary_date, authorized_nodes) |> Enum.filter(&Node.locally_available?/1) - |> P2P.nearest_nodes(patch) + |> P2P.nearest_nodes() |> Enum.take(3) |> Enum.map(&{&1, subset}) end) @@ -121,9 +119,9 @@ defmodule ArchethicWeb.ExplorerIndexLive.TopTransactionsComponent do # We aggregate the subsets for a given node Map.update(acc, node, [subset], &[subset | &1]) end) - |> Flow.flat_map(fn {node, addresses} -> + |> Flow.flat_map(fn {node, subsets} -> # For this node we fetch the summaries - fetch_summaries(node, addresses) + fetch_summaries(node, subsets) end) |> Stream.uniq_by(& &1.address) |> Enum.sort_by(& &1.timestamp, {:desc, DateTime}) diff --git a/test/archethic/p2p_test.exs b/test/archethic/p2p_test.exs index 04b3b5374..563a97045 100644 --- a/test/archethic/p2p_test.exs +++ b/test/archethic/p2p_test.exs @@ -43,31 +43,51 @@ defmodule Archethic.P2PTest do ip: {127, 0, 0, 1}, port: 3002, first_public_key: pub1, - last_public_key: pub1 + last_public_key: pub1, + available?: true, + availability_history: <<1::1>>, + network_patch: "AAA", + geo_patch: "AAA" }, %Node{ ip: {127, 0, 0, 1}, port: 3003, first_public_key: pub2, - last_public_key: pub2 + last_public_key: pub2, + available?: true, + availability_history: <<1::1>>, + network_patch: "AAA", + geo_patch: "AAA" }, %Node{ ip: {127, 0, 0, 1}, port: 3004, first_public_key: pub3, - last_public_key: pub3 + last_public_key: pub3, + available?: true, + availability_history: <<1::1>>, + network_patch: "AAA", + geo_patch: "AAA" }, %Node{ ip: {127, 0, 0, 1}, port: 3005, first_public_key: pub4, - last_public_key: pub4 + last_public_key: pub4, + available?: true, + availability_history: <<1::1>>, + network_patch: "AAA", + geo_patch: "AAA" }, %Node{ ip: {127, 0, 0, 1}, port: 3006, first_public_key: pub5, - last_public_key: pub5 + last_public_key: pub5, + available?: true, + availability_history: <<1::1>>, + network_patch: "AAA", + geo_patch: "AAA" } ] diff --git a/test/archethic/self_repair/sync/transaction_handler_test.exs b/test/archethic/self_repair/sync/transaction_handler_test.exs index 4ae970ae8..23eb71b6f 100644 --- a/test/archethic/self_repair/sync/transaction_handler_test.exs +++ b/test/archethic/self_repair/sync/transaction_handler_test.exs @@ -126,7 +126,6 @@ defmodule Archethic.SelfRepair.Sync.TransactionHandlerTest do assert ^tx = TransactionHandler.download_transaction( tx_summary, - "AAA", P2P.authorized_and_available_nodes() ) end diff --git a/test/archethic/self_repair/sync_test.exs b/test/archethic/self_repair/sync_test.exs index 07772738d..083308030 100644 --- a/test/archethic/self_repair/sync_test.exs +++ b/test/archethic/self_repair/sync_test.exs @@ -263,11 +263,7 @@ defmodule Archethic.SelfRepair.SyncTest do MockDB |> stub(:register_stats, fn _, _, _, _ -> :ok end) - assert :ok = - Sync.load_missed_transactions( - DateTime.utc_now() |> DateTime.add(-86_400), - "AAA" - ) + assert :ok = Sync.load_missed_transactions(DateTime.utc_now() |> DateTime.add(-86_400)) assert_received :storage end @@ -351,7 +347,6 @@ defmodule Archethic.SelfRepair.SyncTest do } ] }, - "AAA", P2P.authorized_and_available_nodes() ) diff --git a/test/archethic/transaction_chain_test.exs b/test/archethic/transaction_chain_test.exs index bbc8e7453..c55dda0f9 100644 --- a/test/archethic/transaction_chain_test.exs +++ b/test/archethic/transaction_chain_test.exs @@ -117,19 +117,23 @@ defmodule Archethic.TransactionChainTest do first_public_key: "node1", last_public_key: "node1", ip: {127, 0, 0, 1}, - port: 3000 + port: 3000, + available?: true, + availability_history: <<1::1>> }, %Node{ first_public_key: "node2", last_public_key: "node2", ip: {127, 0, 0, 1}, - port: 3001 + port: 3001, + availability_history: <<1::1>> }, %Node{ first_public_key: "node3", last_public_key: "node3", ip: {127, 0, 0, 1}, - port: 3002 + port: 3002, + availability_history: <<1::1>> } ] @@ -149,19 +153,25 @@ defmodule Archethic.TransactionChainTest do first_public_key: "node1", last_public_key: "node1", ip: {127, 0, 0, 1}, - port: 3000 + port: 3000, + available?: true, + availability_history: <<1::1>> }, %Node{ first_public_key: "node2", last_public_key: "node2", ip: {127, 0, 0, 1}, - port: 3001 + port: 3001, + available?: true, + availability_history: <<1::1>> }, %Node{ first_public_key: "node3", last_public_key: "node3", ip: {127, 0, 0, 1}, - port: 3002 + port: 3002, + available?: true, + availability_history: <<1::1>> } ] @@ -190,19 +200,25 @@ defmodule Archethic.TransactionChainTest do first_public_key: "node1", last_public_key: "node1", ip: {127, 0, 0, 1}, - port: 3000 + port: 3000, + available?: true, + availability_history: <<1::1>> }, %Node{ first_public_key: "node2", last_public_key: "node2", ip: {127, 0, 0, 1}, - port: 3001 + port: 3001, + available?: true, + availability_history: <<1::1>> }, %Node{ first_public_key: "node3", last_public_key: "node3", ip: {127, 0, 0, 1}, - port: 3002 + port: 3002, + available?: true, + availability_history: <<1::1>> } ] @@ -235,19 +251,25 @@ defmodule Archethic.TransactionChainTest do first_public_key: "node1", last_public_key: "node1", ip: {127, 0, 0, 1}, - port: 3000 + port: 3000, + available?: true, + availability_history: <<1::1>> }, %Node{ first_public_key: "node2", last_public_key: "node2", ip: {127, 0, 0, 1}, - port: 3001 + port: 3001, + available?: true, + availability_history: <<1::1>> }, %Node{ first_public_key: "node3", last_public_key: "node3", ip: {127, 0, 0, 1}, - port: 3002 + port: 3002, + available?: true, + availability_history: <<1::1>> } ] @@ -302,19 +324,25 @@ defmodule Archethic.TransactionChainTest do first_public_key: "node1", last_public_key: "node1", ip: {127, 0, 0, 1}, - port: 3000 + port: 3000, + available?: true, + availability_history: <<1::1>> }, %Node{ first_public_key: "node2", last_public_key: "node2", ip: {127, 0, 0, 1}, - port: 3001 + port: 3001, + available?: true, + availability_history: <<1::1>> }, %Node{ first_public_key: "node3", last_public_key: "node3", ip: {127, 0, 0, 1}, - port: 3002 + port: 3002, + available?: true, + availability_history: <<1::1>> } ] @@ -349,19 +377,25 @@ defmodule Archethic.TransactionChainTest do first_public_key: "node1", last_public_key: "node1", ip: {127, 0, 0, 1}, - port: 3000 + port: 3000, + available?: true, + availability_history: <<1::1>> }, %Node{ first_public_key: "node2", last_public_key: "node2", ip: {127, 0, 0, 1}, - port: 3001 + port: 3001, + available?: true, + availability_history: <<1::1>> }, %Node{ first_public_key: "node3", last_public_key: "node3", ip: {127, 0, 0, 1}, - port: 3002 + port: 3002, + available?: true, + availability_history: <<1::1>> } ] @@ -427,19 +461,25 @@ defmodule Archethic.TransactionChainTest do first_public_key: "node1", last_public_key: "node1", ip: {127, 0, 0, 1}, - port: 3000 + port: 3000, + available?: true, + availability_history: <<1::1>> }, %Node{ first_public_key: "node2", last_public_key: "node2", ip: {127, 0, 0, 1}, - port: 3001 + port: 3001, + available?: true, + availability_history: <<1::1>> }, %Node{ first_public_key: "node3", last_public_key: "node3", ip: {127, 0, 0, 1}, - port: 3002 + port: 3002, + available?: true, + availability_history: <<1::1>> } ] @@ -475,19 +515,25 @@ defmodule Archethic.TransactionChainTest do first_public_key: "node1", last_public_key: "node1", ip: {127, 0, 0, 1}, - port: 3000 + port: 3000, + available?: true, + availability_history: <<1::1>> }, %Node{ first_public_key: "node2", last_public_key: "node2", ip: {127, 0, 0, 1}, - port: 3001 + port: 3001, + available?: true, + availability_history: <<1::1>> }, %Node{ first_public_key: "node3", last_public_key: "node3", ip: {127, 0, 0, 1}, - port: 3002 + port: 3002, + available?: true, + availability_history: <<1::1>> } ] @@ -556,19 +602,25 @@ defmodule Archethic.TransactionChainTest do first_public_key: "node1", last_public_key: "node1", ip: {127, 0, 0, 1}, - port: 3000 + port: 3000, + available?: true, + availability_history: <<1::1>> }, %Node{ first_public_key: "node2", last_public_key: "node2", ip: {127, 0, 0, 1}, - port: 3001 + port: 3001, + available?: true, + availability_history: <<1::1>> }, %Node{ first_public_key: "node3", last_public_key: "node3", ip: {127, 0, 0, 1}, - port: 3002 + port: 3002, + available?: true, + availability_history: <<1::1>> } ] @@ -588,19 +640,25 @@ defmodule Archethic.TransactionChainTest do first_public_key: "node1", last_public_key: "node1", ip: {127, 0, 0, 1}, - port: 3000 + port: 3000, + available?: true, + availability_history: <<1::1>> }, %Node{ first_public_key: "node2", last_public_key: "node2", ip: {127, 0, 0, 1}, - port: 3001 + port: 3001, + available?: true, + availability_history: <<1::1>> }, %Node{ first_public_key: "node3", last_public_key: "node3", ip: {127, 0, 0, 1}, - port: 3002 + port: 3002, + available?: true, + availability_history: <<1::1>> } ]