diff --git a/lib/archethic/beacon_chain/network_coordinates.ex b/lib/archethic/beacon_chain/network_coordinates.ex index 4e947cd31..42df76327 100644 --- a/lib/archethic/beacon_chain/network_coordinates.ex +++ b/lib/archethic/beacon_chain/network_coordinates.ex @@ -60,6 +60,8 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do @spec get_patch_from_latencies(Nx.Tensor.t()) :: list(String.t()) def get_patch_from_latencies(matrix) do if Nx.size(matrix) > 1 do + start_time = System.monotonic_time() + formated_matrix = matrix |> Nx.as_type(:f64) @@ -68,7 +70,17 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do center_mass = compute_distance_from_center_mass(formated_matrix) gram_matrix = get_gram_matrix(formated_matrix, center_mass) {x, y} = get_coordinates(gram_matrix) - get_patch_digits(x, y) + network_patches = get_patch_digits(x, y) + + :telemetry.execute( + [:archethic, :beacon_chain, :network_coordinates, :compute_patch], + %{ + duration: System.monotonic_time() - start_time + }, + %{matrix_size: Nx.size(matrix)} + ) + + network_patches else [] end diff --git a/lib/archethic/beacon_chain/subset.ex b/lib/archethic/beacon_chain/subset.ex index 7595d38cd..e5f0789e1 100644 --- a/lib/archethic/beacon_chain/subset.ex +++ b/lib/archethic/beacon_chain/subset.ex @@ -32,6 +32,9 @@ defmodule Archethic.BeaconChain.Subset do alias Archethic.PubSub + alias Archethic.SelfRepair + + alias Archethic.TaskSupervisor alias Archethic.TransactionChain.TransactionSummary alias Archethic.Utils @@ -268,7 +271,7 @@ defmodule Archethic.BeaconChain.Subset do end defp forward_attestation?(slot_time, subset, node_public_key) do - # We need to forward the transaction if this is the last slot of a summary + # We need to forward the transaction if this is the last slot of a summary # and we are not a summary node because the current slot will not be sent to summary node summary_time?(slot_time) and not beacon_summary_node?(subset, slot_time, node_public_key) end @@ -374,7 +377,8 @@ defmodule Archethic.BeaconChain.Subset do else Logger.debug("Create beacon summary", beacon_subset: Base.encode16(subset)) - patch_task = Task.async(fn -> get_network_patches(subset, time) end) + patch_task = + Task.Supervisor.async_nolink(TaskSupervisor, fn -> get_network_patches(subset, time) end) summary = %Summary{ @@ -386,7 +390,24 @@ defmodule Archethic.BeaconChain.Subset do P2PSampling.list_nodes_to_sample(subset) ) - network_patches = Task.await(patch_task) + network_patches_timeout = + SelfRepair.next_repair_time() + |> DateTime.diff(DateTime.utc_now()) + # We take 10% of the next repair time to determine the timeout + |> Kernel.*(0.9) + |> Kernel.*(1000) + |> round() + + network_patches = + case Task.yield(patch_task, network_patches_timeout) || Task.shutdown(patch_task) do + {:ok, network_patches} -> + network_patches + + _ -> + Logger.warning("Build network patches takes more than #{network_patches_timeout} ms", beacon_subset: Base.encode16(subset)) + [] + end + BeaconChain.write_beacon_summary(%{summary | network_patches: network_patches}) :ok diff --git a/lib/archethic/beacon_chain/subset/stats_collector.ex b/lib/archethic/beacon_chain/subset/stats_collector.ex index 7a48af937..98276b49e 100644 --- a/lib/archethic/beacon_chain/subset/stats_collector.ex +++ b/lib/archethic/beacon_chain/subset/stats_collector.ex @@ -8,6 +8,7 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do use GenServer alias Archethic.BeaconChain.NetworkCoordinates + alias Archethic.TaskSupervisor require Logger @@ -31,7 +32,12 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do end def handle_call({:get, summary_time}, from, state = %{fetching_task: nil}) do - task = Task.async(fn -> NetworkCoordinates.fetch_network_stats(summary_time) end) + task = + Task.Supervisor.async_nolink(TaskSupervisor, fn -> + start_time = System.monotonic_time() + stats = NetworkCoordinates.fetch_network_stats(summary_time) + {stats, start_time} + end) new_state = state @@ -49,8 +55,19 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do {:noreply, new_state} end - def handle_info({ref, stats}, state = %{clients: clients, fetching_task: %Task{ref: ref_task}}) + def handle_info( + {ref, {stats, start_time}}, + state = %{clients: clients, fetching_task: %Task{ref: ref_task}} + ) when ref_task == ref do + :telemetry.execute( + [:archethic, :beacon_chain, :network_coordinates, :collect_stats], + %{ + duration: System.monotonic_time() - start_time + }, + %{matrix_size: Nx.size(stats)} + ) + Enum.each(clients, &GenServer.reply(&1, stats)) new_state = diff --git a/lib/archethic/self_repair.ex b/lib/archethic/self_repair.ex index c49f81d84..231e9f16e 100755 --- a/lib/archethic/self_repair.ex +++ b/lib/archethic/self_repair.ex @@ -275,4 +275,10 @@ defmodule Archethic.SelfRepair do {:error, reason} end end + + @doc """ + Get the next repair time from the scheduler + """ + @spec next_repair_time() :: DateTime.t() + defdelegate next_repair_time, to: Scheduler end diff --git a/lib/archethic/self_repair/scheduler.ex b/lib/archethic/self_repair/scheduler.ex index 90384795e..cebf0162a 100644 --- a/lib/archethic/self_repair/scheduler.ex +++ b/lib/archethic/self_repair/scheduler.ex @@ -9,6 +9,7 @@ defmodule Archethic.SelfRepair.Scheduler do alias Archethic.{P2P, SelfRepair.Sync, TaskSupervisor, Utils, PubSub} alias Archethic.Bootstrap.Sync, as: BootstrapSync + alias Crontab.CronExpression.Parser, as: CronParser require Logger @@ -175,4 +176,20 @@ defmodule Archethic.SelfRepair.Scheduler do def get_interval do GenServer.call(__MODULE__, :get_interval) end + + @doc """ + Calculates the next time the self-repair mechanism should run based on the current interval. + + Args: + - ref_time: The reference time to calculate the next run time from. Defaults to the current UTC time. + + Returns: + - The next time the self-repair mechanism should run. + """ + @spec next_repair_time(DateTime.t()) :: DateTime.t() + def next_repair_time(ref_time \\ DateTime.utc_now()) do + get_interval() + |> CronParser.parse!(true) + |> Utils.next_date(ref_time) + end end diff --git a/lib/archethic/telemetry.ex b/lib/archethic/telemetry.ex index cc18eac5b..52c34d3c4 100644 --- a/lib/archethic/telemetry.ex +++ b/lib/archethic/telemetry.ex @@ -191,6 +191,22 @@ defmodule Archethic.Telemetry do tags: [:nb_summaries] ), counter("archethic.self_repair.resync.count", tags: [:network_chain]), + distribution("archethic.beacon_chain.network_coordinates.compute_patch", + unit: {:native, :second}, + reporter_options: [ + buckets: [0.01, 0.025, 0.05, 0.1, 0.3, 0.5, 0.8, 1, 1.5, 2, 5, 10, 20, 35, 60] + ], + measurement: :duration, + tags: [:matrix_size] + ), + distribution("archethic.beacon_chain.network_coordinates.collect_stats", + unit: {:native, :second}, + reporter_options: [ + buckets: [0.01, 0.025, 0.05, 0.1, 0.3, 0.5, 0.8, 1, 1.5, 2, 5, 10, 20, 35, 60] + ], + measurement: :duration, + tags: [:matrix_size] + ), # Archethic Web counter("archethic_web.hosting.cache_file.hit.count"), diff --git a/test/archethic/beacon_chain/subset_test.exs b/test/archethic/beacon_chain/subset_test.exs index 7b43dea1b..ec4309bf6 100644 --- a/test/archethic/beacon_chain/subset_test.exs +++ b/test/archethic/beacon_chain/subset_test.exs @@ -26,6 +26,8 @@ defmodule Archethic.BeaconChain.SubsetTest do alias Archethic.P2P.Message.Ping alias Archethic.P2P.Node + alias Archethic.SelfRepair.Scheduler, as: SelfRepairScheduler + alias Archethic.TransactionChain.TransactionSummary import Mox @@ -350,6 +352,9 @@ defmodule Archethic.BeaconChain.SubsetTest do summary_interval = "*/3 * * * *" + # This is needed to get network coordinates's task timeout + start_supervised!({SelfRepairScheduler, interval: "*/10 * * * *"}) + start_supervised!({SummaryTimer, interval: summary_interval}) start_supervised!({SlotTimer, interval: "*/1 * * * *"}) start_supervised!(SummaryCache)