Skip to content

Commit

Permalink
New system of node availability based on connection time (#574)
Browse files Browse the repository at this point in the history
* Decrease node availability on request timeout
* Implement availability timer in connection module
* Add availability time in slots
* Aggregate availability time in summary
  • Loading branch information
Neylix committed Sep 19, 2022
1 parent c60dc07 commit 58e9a6f
Show file tree
Hide file tree
Showing 15 changed files with 441 additions and 115 deletions.
38 changes: 18 additions & 20 deletions lib/archethic/beacon_chain/slot.ex
Original file line number Diff line number Diff line change
Expand Up @@ -316,12 +316,12 @@ defmodule Archethic.BeaconChain.Slot do
## Examples
iex> %Slot{
...> p2p_view: %{ availabilities: <<0::1, 0::1, 0::1>>, network_stats: [] }
...> p2p_view: %{ availabilities: <<0::16, 0::16, 0::16>>, network_stats: [] }
...> }
...> |> Slot.add_p2p_view([{true, 10 }, {false, 0 }, {true, 50 }])
...> |> Slot.add_p2p_view([{600, 10 }, {0, 0 }, {356, 50 }])
%Slot{
p2p_view: %{
availabilities: <<1::1, 0::1, 1::1>>,
availabilities: <<600::16, 0::16, 356::16>>,
network_stats: [
%{ latency: 10 },
%{ latency: 0},
Expand All @@ -334,16 +334,10 @@ defmodule Archethic.BeaconChain.Slot do
def add_p2p_view(slot = %__MODULE__{}, p2p_views) do
%{availabilities: availabilities, network_stats: network_stats} =
p2p_views
|> Enum.reduce(%{availabilities: [], network_stats: []}, fn
{true, latency}, acc ->
acc
|> Map.update!(:availabilities, &(&1 ++ [<<1::1>>]))
|> Map.update!(:network_stats, &(&1 ++ [%{latency: latency}]))

{false, _}, acc ->
acc
|> Map.update!(:availabilities, &(&1 ++ [<<0::1>>]))
|> Map.update!(:network_stats, &(&1 ++ [%{latency: 0}]))
|> Enum.reduce(%{availabilities: [], network_stats: []}, fn {availability, latency}, acc ->
acc
|> Map.update!(:availabilities, &(&1 ++ [<<availability::16>>]))
|> Map.update!(:network_stats, &(&1 ++ [%{latency: latency}]))
end)

%{
Expand Down Expand Up @@ -385,7 +379,7 @@ defmodule Archethic.BeaconChain.Slot do
...> timestamp: ~U[2020-06-25 15:11:53Z]
...> }],
...> p2p_view: %{
...> availabilities: <<1::1, 0::1>>,
...> availabilities: <<600::16, 356::16>>,
...> network_stats: [
...> %{ latency: 10},
...> %{ latency: 0}
Expand Down Expand Up @@ -432,10 +426,10 @@ defmodule Archethic.BeaconChain.Slot do
100, 169, 225, 113, 249, 125, 21, 168, 14, 196, 222, 140, 87, 143, 241,
# Node readyness timestamp
94, 244, 190, 185,
# P2P view bitstring size
# P2P view network stats length
0, 2,
# P2P view availabilies
1::1, 0::1,
600::16, 356::16,
# P2P view network stats (1st node)
10,
# P2P view network stats (2nd node)
Expand Down Expand Up @@ -477,7 +471,7 @@ defmodule Archethic.BeaconChain.Slot do
<<1::8, subset::binary, DateTime.to_unix(slot_time)::32,
encoded_transaction_attestations_len::binary, transaction_attestations_bin::binary,
encoded_end_of_node_synchronizations_len::binary, end_of_node_synchronizations_bin::binary,
bit_size(availabilities)::16, availabilities::bitstring, net_stats_bin::binary>>
length(network_stats)::16, availabilities::bitstring, net_stats_bin::binary>>
end

@doc """
Expand All @@ -494,7 +488,7 @@ defmodule Archethic.BeaconChain.Slot do
...> 43, 216, 176, 11, 159, 188, 119, 6, 8, 48, 201, 244, 138, 99, 52, 22, 1, 97, 123,
...> 140, 195, 1, 1, 0, 0, 38, 105, 235, 147, 234, 114, 41, 1, 152, 148, 120, 31, 200,
...> 255, 174, 190, 91, 100, 169, 225, 113, 249, 125, 21, 168, 14, 196, 222, 140, 87,
...> 143, 241, 94, 244, 190, 185, 0, 2, 1::1, 0::1, 10, 0 >>
...> 143, 241, 94, 244, 190, 185, 0, 2, 600::16, 356::16, 10, 0 >>
...> |> Slot.deserialize()
{
%Slot{
Expand Down Expand Up @@ -523,7 +517,7 @@ defmodule Archethic.BeaconChain.Slot do
timestamp: ~U[2020-06-25 15:11:53Z]
}],
p2p_view: %{
availabilities: <<1::1, 0::1>>,
availabilities: <<600::16, 356::16>>,
network_stats: [
%{ latency: 10},
%{ latency: 0}
Expand All @@ -545,7 +539,11 @@ defmodule Archethic.BeaconChain.Slot do
{end_of_node_synchronizations, rest} =
deserialize_end_of_node_synchronizations(rest, nb_end_of_sync, [])

<<p2p_view_size::16, availabilities::bitstring-size(p2p_view_size), rest::bitstring>> = rest
<<p2p_view_size::16, rest::bitstring>> = rest

availabilities_size = p2p_view_size * 2

<<availabilities::binary-size(availabilities_size), rest::bitstring>> = rest

{network_stats, rest} = deserialize_network_stats(rest, p2p_view_size, [])

Expand Down
5 changes: 5 additions & 0 deletions lib/archethic/beacon_chain/slot_timer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ defmodule Archethic.BeaconChain.SlotTimer do
|> Enum.to_list()
end

def get_time_interval(unit \\ :second) do
now = DateTime.utc_now()
DateTime.diff(next_slot(now), previous_slot(now), unit)
end

defp get_interval do
[{_, interval}] = :ets.lookup(@slot_timer_ets, :interval)
interval
Expand Down
52 changes: 43 additions & 9 deletions lib/archethic/beacon_chain/subset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ defmodule Archethic.BeaconChain.Subset do
alias Archethic.Election

alias Archethic.P2P
alias Archethic.P2P.Client
alias Archethic.P2P.Message.NewBeaconSlot
alias Archethic.P2P.Message.BeaconUpdate

Expand Down Expand Up @@ -125,8 +126,22 @@ defmodule Archethic.BeaconChain.Subset do
{:create_slot, time},
state = %{subset: subset, node_public_key: node_public_key, current_slot: current_slot}
) do
nodes_availability_times =
P2PSampling.list_nodes_to_sample(subset)
|> Task.async_stream(fn node ->
if node.first_public_key == Crypto.first_node_public_key() do
SlotTimer.get_time_interval()
else
Client.get_availability_timer(node.first_public_key, true)
end
end)
|> Enum.map(fn
{:ok, res} -> res
_ -> 0
end)

if beacon_slot_node?(subset, time, node_public_key) do
handle_slot(time, current_slot)
handle_slot(time, current_slot, nodes_availability_times)

if summary_time?(time) and beacon_summary_node?(subset, time, node_public_key) do
handle_summary(time, subset)
Expand Down Expand Up @@ -175,9 +190,23 @@ defmodule Archethic.BeaconChain.Subset do

# Request the P2P view sampling if the not perfomed from the last 3 seconds
if update_p2p_view?(state) do
nodes_availability_times =
P2PSampling.list_nodes_to_sample(subset)
|> Task.async_stream(fn node ->
if node.first_public_key == Crypto.first_node_public_key() do
SlotTimer.get_time_interval()
else
Client.get_availability_timer(node.first_public_key, false)
end
end)
|> Enum.map(fn
{:ok, res} -> res
_ -> 0
end)

new_state =
state
|> Map.put(:current_slot, add_p2p_view(new_slot))
|> Map.put(:current_slot, add_p2p_view(new_slot, nodes_availability_times))
|> Map.put(:sampling_time, DateTime.utc_now())

{:noreply, new_state}
Expand Down Expand Up @@ -225,9 +254,10 @@ defmodule Archethic.BeaconChain.Subset do

defp handle_slot(
time,
current_slot = %Slot{subset: subset}
current_slot = %Slot{subset: subset},
nodes_availability_times
) do
current_slot = ensure_p2p_view(current_slot)
current_slot = ensure_p2p_view(current_slot, nodes_availability_times)

# Avoid to store or dispatch an empty beacon's slot
unless Slot.empty?(current_slot) do
Expand Down Expand Up @@ -326,17 +356,21 @@ defmodule Archethic.BeaconChain.Subset do
|> Utils.key_in_node_list?(node_public_key)
end

defp add_p2p_view(current_slot = %Slot{subset: subset}) do
p2p_views = P2PSampling.get_p2p_views(P2PSampling.list_nodes_to_sample(subset))
defp add_p2p_view(current_slot = %Slot{subset: subset}, nodes_availability_times) do
p2p_views =
P2PSampling.get_p2p_views(
P2PSampling.list_nodes_to_sample(subset),
nodes_availability_times
)

Slot.add_p2p_view(current_slot, p2p_views)
end

defp ensure_p2p_view(slot = %Slot{p2p_view: %{availabilities: <<>>}}) do
add_p2p_view(slot)
defp ensure_p2p_view(slot = %Slot{p2p_view: %{availabilities: <<>>}}, nodes_availability_times) do
add_p2p_view(slot, nodes_availability_times)
end

defp ensure_p2p_view(slot = %Slot{}), do: slot
defp ensure_p2p_view(slot = %Slot{}, _), do: slot

@doc """
Add node public key to the corresponding subset for beacon updates
Expand Down
17 changes: 9 additions & 8 deletions lib/archethic/beacon_chain/subset/p2p_sampling.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,20 @@ defmodule Archethic.BeaconChain.Subset.P2PSampling do
@doc """
Get the p2p view for the given nodes while computing the bandwidth from the latency
"""
@spec get_p2p_views(list(Node.t())) :: list(p2p_view())
def get_p2p_views(nodes) when is_list(nodes) do
@spec get_p2p_views(list(Node.t()), list(non_neg_integer())) :: list(p2p_view())
def get_p2p_views(nodes, nodes_availability_times) when is_list(nodes) do
timeout = 1_000

Task.Supervisor.async_stream_nolink(TaskSupervisor, nodes, &do_sample_p2p_view(&1, timeout),
on_timeout: :kill_task
)
|> Enum.with_index()
|> Enum.map(fn
{:ok, res} ->
res
{{:ok, latency}, index} ->
{Enum.at(nodes_availability_times, index), latency}

{:exit, :timeout} ->
{false, timeout}
{{:exit, :timeout}, index} ->
{Enum.at(nodes_availability_times, index), 0}
end)
end

Expand All @@ -47,10 +48,10 @@ defmodule Archethic.BeaconChain.Subset.P2PSampling do
case P2P.send_message(node, %Ping{}, timeout) do
{:ok, %Ok{}} ->
end_time = System.monotonic_time(:millisecond)
{true, end_time - start_time}
end_time - start_time

{:error, _} ->
{false, timeout}
0
end
end
end
Loading

0 comments on commit 58e9a6f

Please sign in to comment.