diff --git a/lib/archethic/beacon_chain.ex b/lib/archethic/beacon_chain.ex index 9a057c2eb4..5684887e90 100644 --- a/lib/archethic/beacon_chain.ex +++ b/lib/archethic/beacon_chain.ex @@ -228,12 +228,13 @@ defmodule Archethic.BeaconChain do @doc """ Return a list of beacon summaries from a list of transaction addresses """ - @spec get_beacon_summaries(list(binary)) :: Enumerable.t() | list(Summary.t()) + @spec get_beacon_summaries(list(binary)) :: list(Summary.t()) def get_beacon_summaries(addresses) do addresses |> Stream.map(&get_summary/1) |> Stream.reject(&match?({:error, :not_found}, &1)) |> Stream.map(fn {:ok, summary} -> summary end) + |> Enum.to_list() end @doc """ @@ -342,15 +343,28 @@ defmodule Archethic.BeaconChain do defp fetch_summaries(node, addresses) do addresses |> Stream.chunk_every(10) - |> Stream.flat_map(fn addresses -> - case P2P.send_message(node, %GetBeaconSummaries{addresses: addresses}) do - {:ok, %BeaconSummaryList{summaries: summaries}} -> - summaries - - _ -> - [] - end - end) + |> Stream.flat_map(&batch_summaries_fetching(&1, node)) |> Enum.to_list() end + + defp batch_summaries_fetching(addresses, node) do + %{summaries: local_summaries, remaining: remaining} = + Enum.reduce(addresses, %{summaries: [], remaining: []}, fn addr, acc -> + case get_summary(addr) do + {:ok, summary} -> + Map.update!(acc, :summaries, &[summary | &1]) + + _ -> + Map.update!(acc, :remaining, &[addr | &1]) + end + end) + + case P2P.send_message(node, %GetBeaconSummaries{addresses: remaining}) do + {:ok, %BeaconSummaryList{summaries: fetched_summaries}} -> + local_summaries ++ fetched_summaries + + _ -> + local_summaries + end + end end diff --git a/lib/archethic/p2p/message.ex b/lib/archethic/p2p/message.ex index ef28d3a91f..4777c8cc3e 100644 --- a/lib/archethic/p2p/message.ex +++ b/lib/archethic/p2p/message.ex @@ -177,12 +177,12 @@ defmodule Archethic.P2P.Message do get_max_timeout() * 10 end - def get_timeout(%GetBeaconSummaries{addresses: addresses}) do - # We can expect high beacon summary where a transaction replication will contains a single UCO transfer - # CALC: Tx address + recipient address + tx type + tx timestamp + storage node public key + signature * 200 (max storage nodes) - beacon_summary_high_estimation_bytes = 34 + 34 + 1 + 8 + (8 + 34 + 34 * 200) - length(addresses) * trunc(beacon_summary_high_estimation_bytes / @floor_upload_speed * 1000) - end + # def get_timeout(%GetBeaconSummaries{addresses: addresses}) do + # # We can expect high beacon summary where a transaction replication will contains a single UCO transfer + # # CALC: Tx address + recipient address + tx type + tx timestamp + storage node public key + signature * 200 (max storage nodes) + # beacon_summary_high_estimation_bytes = 34 + 34 + 1 + 8 + (8 + 34 + 34 * 200) + # length(addresses) * trunc(beacon_summary_high_estimation_bytes / @floor_upload_speed * 1000) + # end def get_timeout(%GetUnspentOutputs{}), do: get_max_timeout() def get_timeout(%GetTransactionInputs{}), do: get_max_timeout()