diff --git a/lib/archethic/beacon_chain.ex b/lib/archethic/beacon_chain.ex index 6a53b1822..08c826355 100644 --- a/lib/archethic/beacon_chain.ex +++ b/lib/archethic/beacon_chain.ex @@ -136,8 +136,6 @@ defmodule Archethic.BeaconChain do end end - def load_transaction(_), do: :ok - defp validate_slot(slot = %Slot{}) do cond do !SlotValidation.valid_transaction_attestations?(slot) -> diff --git a/lib/archethic/self_repair/sync.ex b/lib/archethic/self_repair/sync.ex index 540c8c647..b2f24d564 100644 --- a/lib/archethic/self_repair/sync.ex +++ b/lib/archethic/self_repair/sync.ex @@ -126,7 +126,8 @@ defmodule Archethic.SelfRepair.Sync do # Process first the old aggregates fetch_summaries_aggregates(last_sync_date, last_summary_time, download_nodes) - |> Enum.each(&process_summary_aggregate(&1, download_nodes)) + |> Stream.each(&process_summary_aggregate(&1, download_nodes)) + |> Stream.run() # Then process the last one to have the last P2P view last_aggregate = BeaconChain.fetch_and_aggregate_summaries(last_summary_time, download_nodes) @@ -141,24 +142,31 @@ defmodule Archethic.SelfRepair.Sync do end defp fetch_summaries_aggregates(last_sync_date, last_summary_time, download_nodes) do - last_sync_date - |> BeaconChain.next_summary_dates() - # Take only the previous summaries before the last one - |> Stream.take_while(fn date -> - DateTime.compare(date, last_summary_time) == - :lt - end) - # Fetch the beacon summaries aggregate - |> Task.async_stream(fn date -> - Logger.debug("Fetch summary aggregate for #{date}") - - storage_nodes = - date - |> Crypto.derive_beacon_aggregate_address() - |> Election.chain_storage_nodes(download_nodes) + dates = + last_sync_date + |> BeaconChain.next_summary_dates() + # Take only the previous summaries before the last one + |> Stream.take_while(fn date -> + DateTime.compare(date, last_summary_time) == + :lt + end) - BeaconChain.fetch_summaries_aggregate(date, storage_nodes) - end) + # Fetch the beacon summaries aggregate + Task.Supervisor.async_stream( + TaskSupervisor, + dates, + fn date -> + Logger.debug("Fetch summary aggregate for #{date}") + + storage_nodes = + date + |> Crypto.derive_beacon_aggregate_address() + |> Election.chain_storage_nodes(download_nodes) + + BeaconChain.fetch_summaries_aggregate(date, storage_nodes) + end, + max_concurrency: 2 + ) |> Stream.filter(fn {:ok, {:ok, %SummaryAggregate{}}} -> true