Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle timeout network coordinates #1085

Merged
merged 4 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion lib/archethic/beacon_chain/network_coordinates.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do
@spec get_patch_from_latencies(Nx.Tensor.t()) :: list(String.t())
def get_patch_from_latencies(matrix) do
if Nx.size(matrix) > 1 do
start_time = System.monotonic_time()

formated_matrix =
matrix
|> Nx.as_type(:f64)
Expand All @@ -68,7 +70,17 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do
center_mass = compute_distance_from_center_mass(formated_matrix)
gram_matrix = get_gram_matrix(formated_matrix, center_mass)
{x, y} = get_coordinates(gram_matrix)
get_patch_digits(x, y)
network_patches = get_patch_digits(x, y)

:telemetry.execute(
[:archethic, :beacon_chain, :network_coordinates, :compute_patch],
%{
duration: System.monotonic_time() - start_time
},
%{matrix_size: Nx.size(matrix)}
)

network_patches
else
[]
end
Expand Down
27 changes: 24 additions & 3 deletions lib/archethic/beacon_chain/subset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ defmodule Archethic.BeaconChain.Subset do

alias Archethic.PubSub

alias Archethic.SelfRepair

alias Archethic.TaskSupervisor
alias Archethic.TransactionChain.TransactionSummary

alias Archethic.Utils
Expand Down Expand Up @@ -268,7 +271,7 @@ defmodule Archethic.BeaconChain.Subset do
end

defp forward_attestation?(slot_time, subset, node_public_key) do
# We need to forward the transaction if this is the last slot of a summary
# We need to forward the transaction if this is the last slot of a summary
# and we are not a summary node because the current slot will not be sent to summary node
summary_time?(slot_time) and not beacon_summary_node?(subset, slot_time, node_public_key)
end
Expand Down Expand Up @@ -374,7 +377,8 @@ defmodule Archethic.BeaconChain.Subset do
else
Logger.debug("Create beacon summary", beacon_subset: Base.encode16(subset))

patch_task = Task.async(fn -> get_network_patches(subset, time) end)
patch_task =
Task.Supervisor.async_nolink(TaskSupervisor, fn -> get_network_patches(subset, time) end)

summary =
%Summary{
Expand All @@ -386,7 +390,24 @@ defmodule Archethic.BeaconChain.Subset do
P2PSampling.list_nodes_to_sample(subset)
)

network_patches = Task.await(patch_task)
network_patches_timeout =
SelfRepair.next_repair_time()
|> DateTime.diff(DateTime.utc_now())
# We take 10% of the next repair time to determine the timeout
|> Kernel.*(0.9)
|> Kernel.*(1000)
|> round()

network_patches =
case Task.yield(patch_task, network_patches_timeout) || Task.shutdown(patch_task) do
{:ok, network_patches} ->
network_patches

_ ->
Logger.warning("Build network patches takes more than #{network_patches_timeout} ms", beacon_subset: Base.encode16(subset))
[]
end

BeaconChain.write_beacon_summary(%{summary | network_patches: network_patches})

:ok
Expand Down
21 changes: 19 additions & 2 deletions lib/archethic/beacon_chain/subset/stats_collector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do
use GenServer

alias Archethic.BeaconChain.NetworkCoordinates
alias Archethic.TaskSupervisor

require Logger

Expand All @@ -31,7 +32,12 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do
end

def handle_call({:get, summary_time}, from, state = %{fetching_task: nil}) do
task = Task.async(fn -> NetworkCoordinates.fetch_network_stats(summary_time) end)
task =
Task.Supervisor.async_nolink(TaskSupervisor, fn ->
start_time = System.monotonic_time()
stats = NetworkCoordinates.fetch_network_stats(summary_time)
{stats, start_time}
end)

new_state =
state
Expand All @@ -49,8 +55,19 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do
{:noreply, new_state}
end

def handle_info({ref, stats}, state = %{clients: clients, fetching_task: %Task{ref: ref_task}})
def handle_info(
{ref, {stats, start_time}},
state = %{clients: clients, fetching_task: %Task{ref: ref_task}}
)
when ref_task == ref do
:telemetry.execute(
[:archethic, :beacon_chain, :network_coordinates, :collect_stats],
%{
duration: System.monotonic_time() - start_time
},
%{matrix_size: Nx.size(stats)}
)

Enum.each(clients, &GenServer.reply(&1, stats))

new_state =
Expand Down
6 changes: 6 additions & 0 deletions lib/archethic/self_repair.ex
Original file line number Diff line number Diff line change
Expand Up @@ -275,4 +275,10 @@ defmodule Archethic.SelfRepair do
{:error, reason}
end
end

@doc """
Get the next repair time from the scheduler
"""
@spec next_repair_time() :: DateTime.t()
defdelegate next_repair_time, to: Scheduler
end
17 changes: 17 additions & 0 deletions lib/archethic/self_repair/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule Archethic.SelfRepair.Scheduler do
alias Archethic.{P2P, SelfRepair.Sync, TaskSupervisor, Utils, PubSub}

alias Archethic.Bootstrap.Sync, as: BootstrapSync
alias Crontab.CronExpression.Parser, as: CronParser

require Logger

Expand Down Expand Up @@ -175,4 +176,20 @@ defmodule Archethic.SelfRepair.Scheduler do
def get_interval do
GenServer.call(__MODULE__, :get_interval)
end

@doc """
Calculates the next time the self-repair mechanism should run based on the current interval.

Args:
- ref_time: The reference time to calculate the next run time from. Defaults to the current UTC time.

Returns:
- The next time the self-repair mechanism should run.
"""
@spec next_repair_time(DateTime.t()) :: DateTime.t()
def next_repair_time(ref_time \\ DateTime.utc_now()) do
get_interval()
|> CronParser.parse!(true)
|> Utils.next_date(ref_time)
end
end
16 changes: 16 additions & 0 deletions lib/archethic/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,22 @@ defmodule Archethic.Telemetry do
tags: [:nb_summaries]
),
counter("archethic.self_repair.resync.count", tags: [:network_chain]),
distribution("archethic.beacon_chain.network_coordinates.compute_patch",
unit: {:native, :second},
reporter_options: [
buckets: [0.01, 0.025, 0.05, 0.1, 0.3, 0.5, 0.8, 1, 1.5, 2, 5, 10, 20, 35, 60]
],
measurement: :duration,
tags: [:matrix_size]
),
distribution("archethic.beacon_chain.network_coordinates.collect_stats",
unit: {:native, :second},
reporter_options: [
buckets: [0.01, 0.025, 0.05, 0.1, 0.3, 0.5, 0.8, 1, 1.5, 2, 5, 10, 20, 35, 60]
],
measurement: :duration,
tags: [:matrix_size]
),

# Archethic Web
counter("archethic_web.hosting.cache_file.hit.count"),
Expand Down
5 changes: 5 additions & 0 deletions test/archethic/beacon_chain/subset_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ defmodule Archethic.BeaconChain.SubsetTest do
alias Archethic.P2P.Message.Ping
alias Archethic.P2P.Node

alias Archethic.SelfRepair.Scheduler, as: SelfRepairScheduler

alias Archethic.TransactionChain.TransactionSummary

import Mox
Expand Down Expand Up @@ -350,6 +352,9 @@ defmodule Archethic.BeaconChain.SubsetTest do

summary_interval = "*/3 * * * *"

# This is needed to get network coordinates's task timeout
start_supervised!({SelfRepairScheduler, interval: "*/10 * * * *"})

start_supervised!({SummaryTimer, interval: summary_interval})
start_supervised!({SlotTimer, interval: "*/1 * * * *"})
start_supervised!(SummaryCache)
Expand Down