diff --git a/lib/archethic/beacon_chain/slot.ex b/lib/archethic/beacon_chain/slot.ex index 3c3b604b0..50322d706 100644 --- a/lib/archethic/beacon_chain/slot.ex +++ b/lib/archethic/beacon_chain/slot.ex @@ -316,12 +316,12 @@ defmodule Archethic.BeaconChain.Slot do ## Examples iex> %Slot{ - ...> p2p_view: %{ availabilities: <<0::1, 0::1, 0::1>>, network_stats: [] } + ...> p2p_view: %{ availabilities: <<0::16, 0::16, 0::16>>, network_stats: [] } ...> } - ...> |> Slot.add_p2p_view([{true, 10 }, {false, 0 }, {true, 50 }]) + ...> |> Slot.add_p2p_view([{600, 10 }, {0, 0 }, {356, 50 }]) %Slot{ p2p_view: %{ - availabilities: <<1::1, 0::1, 1::1>>, + availabilities: <<600::16, 0::16, 356::16>>, network_stats: [ %{ latency: 10 }, %{ latency: 0}, @@ -334,16 +334,10 @@ defmodule Archethic.BeaconChain.Slot do def add_p2p_view(slot = %__MODULE__{}, p2p_views) do %{availabilities: availabilities, network_stats: network_stats} = p2p_views - |> Enum.reduce(%{availabilities: [], network_stats: []}, fn - {true, latency}, acc -> - acc - |> Map.update!(:availabilities, &(&1 ++ [<<1::1>>])) - |> Map.update!(:network_stats, &(&1 ++ [%{latency: latency}])) - - {false, _}, acc -> - acc - |> Map.update!(:availabilities, &(&1 ++ [<<0::1>>])) - |> Map.update!(:network_stats, &(&1 ++ [%{latency: 0}])) + |> Enum.reduce(%{availabilities: [], network_stats: []}, fn {availability, latency}, acc -> + acc + |> Map.update!(:availabilities, &(&1 ++ [<>])) + |> Map.update!(:network_stats, &(&1 ++ [%{latency: latency}])) end) %{ @@ -385,7 +379,7 @@ defmodule Archethic.BeaconChain.Slot do ...> timestamp: ~U[2020-06-25 15:11:53Z] ...> }], ...> p2p_view: %{ - ...> availabilities: <<1::1, 0::1>>, + ...> availabilities: <<600::16, 356::16>>, ...> network_stats: [ ...> %{ latency: 10}, ...> %{ latency: 0} @@ -432,10 +426,10 @@ defmodule Archethic.BeaconChain.Slot do 100, 169, 225, 113, 249, 125, 21, 168, 14, 196, 222, 140, 87, 143, 241, # Node readyness timestamp 94, 244, 190, 185, - # P2P view bitstring size + # P2P view network stats length 0, 2, # P2P view availabilies - 1::1, 0::1, + 600::16, 356::16, # P2P view network stats (1st node) 10, # P2P view network stats (2nd node) @@ -477,7 +471,7 @@ defmodule Archethic.BeaconChain.Slot do <<1::8, subset::binary, DateTime.to_unix(slot_time)::32, encoded_transaction_attestations_len::binary, transaction_attestations_bin::binary, encoded_end_of_node_synchronizations_len::binary, end_of_node_synchronizations_bin::binary, - bit_size(availabilities)::16, availabilities::bitstring, net_stats_bin::binary>> + length(network_stats)::16, availabilities::bitstring, net_stats_bin::binary>> end @doc """ @@ -494,7 +488,7 @@ defmodule Archethic.BeaconChain.Slot do ...> 43, 216, 176, 11, 159, 188, 119, 6, 8, 48, 201, 244, 138, 99, 52, 22, 1, 97, 123, ...> 140, 195, 1, 1, 0, 0, 38, 105, 235, 147, 234, 114, 41, 1, 152, 148, 120, 31, 200, ...> 255, 174, 190, 91, 100, 169, 225, 113, 249, 125, 21, 168, 14, 196, 222, 140, 87, - ...> 143, 241, 94, 244, 190, 185, 0, 2, 1::1, 0::1, 10, 0 >> + ...> 143, 241, 94, 244, 190, 185, 0, 2, 600::16, 356::16, 10, 0 >> ...> |> Slot.deserialize() { %Slot{ @@ -523,7 +517,7 @@ defmodule Archethic.BeaconChain.Slot do timestamp: ~U[2020-06-25 15:11:53Z] }], p2p_view: %{ - availabilities: <<1::1, 0::1>>, + availabilities: <<600::16, 356::16>>, network_stats: [ %{ latency: 10}, %{ latency: 0} @@ -545,7 +539,11 @@ defmodule Archethic.BeaconChain.Slot do {end_of_node_synchronizations, rest} = deserialize_end_of_node_synchronizations(rest, nb_end_of_sync, []) - <> = rest + <> = rest + + availabilities_size = p2p_view_size * 2 + + <> = rest {network_stats, rest} = deserialize_network_stats(rest, p2p_view_size, []) diff --git a/lib/archethic/beacon_chain/slot_timer.ex b/lib/archethic/beacon_chain/slot_timer.ex index 3805c6769..dd0c6e063 100644 --- a/lib/archethic/beacon_chain/slot_timer.ex +++ b/lib/archethic/beacon_chain/slot_timer.ex @@ -86,6 +86,11 @@ defmodule Archethic.BeaconChain.SlotTimer do |> Enum.to_list() end + def get_time_interval(unit \\ :second) do + now = DateTime.utc_now() + DateTime.diff(next_slot(now), previous_slot(now), unit) + end + defp get_interval do [{_, interval}] = :ets.lookup(@slot_timer_ets, :interval) interval diff --git a/lib/archethic/beacon_chain/subset.ex b/lib/archethic/beacon_chain/subset.ex index 7f99bebbb..565555241 100644 --- a/lib/archethic/beacon_chain/subset.ex +++ b/lib/archethic/beacon_chain/subset.ex @@ -22,6 +22,7 @@ defmodule Archethic.BeaconChain.Subset do alias Archethic.Election alias Archethic.P2P + alias Archethic.P2P.Client alias Archethic.P2P.Message.NewBeaconSlot alias Archethic.P2P.Message.BeaconUpdate @@ -125,8 +126,22 @@ defmodule Archethic.BeaconChain.Subset do {:create_slot, time}, state = %{subset: subset, node_public_key: node_public_key, current_slot: current_slot} ) do + nodes_availability_times = + P2PSampling.list_nodes_to_sample(subset) + |> Task.async_stream(fn node -> + if node.first_public_key == Crypto.first_node_public_key() do + SlotTimer.get_time_interval() + else + Client.get_availability_timer(node.first_public_key, true) + end + end) + |> Enum.map(fn + {:ok, res} -> res + _ -> 0 + end) + if beacon_slot_node?(subset, time, node_public_key) do - handle_slot(time, current_slot) + handle_slot(time, current_slot, nodes_availability_times) if summary_time?(time) and beacon_summary_node?(subset, time, node_public_key) do handle_summary(time, subset) @@ -175,9 +190,23 @@ defmodule Archethic.BeaconChain.Subset do # Request the P2P view sampling if the not perfomed from the last 3 seconds if update_p2p_view?(state) do + nodes_availability_times = + P2PSampling.list_nodes_to_sample(subset) + |> Task.async_stream(fn node -> + if node.first_public_key == Crypto.first_node_public_key() do + SlotTimer.get_time_interval() + else + Client.get_availability_timer(node.first_public_key, false) + end + end) + |> Enum.map(fn + {:ok, res} -> res + _ -> 0 + end) + new_state = state - |> Map.put(:current_slot, add_p2p_view(new_slot)) + |> Map.put(:current_slot, add_p2p_view(new_slot, nodes_availability_times)) |> Map.put(:sampling_time, DateTime.utc_now()) {:noreply, new_state} @@ -225,9 +254,10 @@ defmodule Archethic.BeaconChain.Subset do defp handle_slot( time, - current_slot = %Slot{subset: subset} + current_slot = %Slot{subset: subset}, + nodes_availability_times ) do - current_slot = ensure_p2p_view(current_slot) + current_slot = ensure_p2p_view(current_slot, nodes_availability_times) # Avoid to store or dispatch an empty beacon's slot unless Slot.empty?(current_slot) do @@ -326,17 +356,21 @@ defmodule Archethic.BeaconChain.Subset do |> Utils.key_in_node_list?(node_public_key) end - defp add_p2p_view(current_slot = %Slot{subset: subset}) do - p2p_views = P2PSampling.get_p2p_views(P2PSampling.list_nodes_to_sample(subset)) + defp add_p2p_view(current_slot = %Slot{subset: subset}, nodes_availability_times) do + p2p_views = + P2PSampling.get_p2p_views( + P2PSampling.list_nodes_to_sample(subset), + nodes_availability_times + ) Slot.add_p2p_view(current_slot, p2p_views) end - defp ensure_p2p_view(slot = %Slot{p2p_view: %{availabilities: <<>>}}) do - add_p2p_view(slot) + defp ensure_p2p_view(slot = %Slot{p2p_view: %{availabilities: <<>>}}, nodes_availability_times) do + add_p2p_view(slot, nodes_availability_times) end - defp ensure_p2p_view(slot = %Slot{}), do: slot + defp ensure_p2p_view(slot = %Slot{}, _), do: slot @doc """ Add node public key to the corresponding subset for beacon updates diff --git a/lib/archethic/beacon_chain/subset/p2p_sampling.ex b/lib/archethic/beacon_chain/subset/p2p_sampling.ex index e55e764f9..34f6abf84 100644 --- a/lib/archethic/beacon_chain/subset/p2p_sampling.ex +++ b/lib/archethic/beacon_chain/subset/p2p_sampling.ex @@ -25,19 +25,20 @@ defmodule Archethic.BeaconChain.Subset.P2PSampling do @doc """ Get the p2p view for the given nodes while computing the bandwidth from the latency """ - @spec get_p2p_views(list(Node.t())) :: list(p2p_view()) - def get_p2p_views(nodes) when is_list(nodes) do + @spec get_p2p_views(list(Node.t()), list(non_neg_integer())) :: list(p2p_view()) + def get_p2p_views(nodes, nodes_availability_times) when is_list(nodes) do timeout = 1_000 Task.Supervisor.async_stream_nolink(TaskSupervisor, nodes, &do_sample_p2p_view(&1, timeout), on_timeout: :kill_task ) + |> Enum.with_index() |> Enum.map(fn - {:ok, res} -> - res + {{:ok, latency}, index} -> + {Enum.at(nodes_availability_times, index), latency} - {:exit, :timeout} -> - {false, timeout} + {{:exit, :timeout}, index} -> + {Enum.at(nodes_availability_times, index), 0} end) end @@ -47,10 +48,10 @@ defmodule Archethic.BeaconChain.Subset.P2PSampling do case P2P.send_message(node, %Ping{}, timeout) do {:ok, %Ok{}} -> end_time = System.monotonic_time(:millisecond) - {true, end_time - start_time} + end_time - start_time {:error, _} -> - {false, timeout} + 0 end end end diff --git a/lib/archethic/beacon_chain/summary.ex b/lib/archethic/beacon_chain/summary.ex index 95cb6d79d..3cf0acf08 100644 --- a/lib/archethic/beacon_chain/summary.ex +++ b/lib/archethic/beacon_chain/summary.ex @@ -6,6 +6,7 @@ defmodule Archethic.BeaconChain.Summary do alias Archethic.BeaconChain.ReplicationAttestation alias Archethic.BeaconChain.Slot + alias Archethic.BeaconChain.SlotTimer alias Archethic.BeaconChain.Slot.EndOfNodeSync alias Archethic.P2P.Node @@ -57,7 +58,9 @@ defmodule Archethic.BeaconChain.Summary do ### Aggregate the P2P view and the transaction summaries for a static list of nodes during the beacon chain - iex> Summary.aggregate_slots(%Summary{}, [ + iex> :ets.new(:archethic_slot_timer, [:named_table, :public, read_concurrency: true]) + ...> :ets.insert(:archethic_slot_timer, {:interval, "0 */10 * * * * *"}) + ...> Summary.aggregate_slots(%Summary{}, [ ...> %Slot{ ...> slot_time: ~U[2020-06-25 15:12:00Z], ...> transaction_attestations: [ @@ -71,7 +74,7 @@ defmodule Archethic.BeaconChain.Summary do ...> } ...> } ...> ], - ...> p2p_view: %{ availabilities: <<1::1, 0::1, 1::1>>} + ...> p2p_view: %{ availabilities: <<600::16, 0::16, 600::16>>} ...> }, ...> %Slot{ ...> slot_time: ~U[2020-06-25 15:12:00Z], @@ -86,16 +89,16 @@ defmodule Archethic.BeaconChain.Summary do ...> } ...> } ...> ], - ...> p2p_view: %{ availabilities: <<1::1, 0::1, 1::1>>} + ...> p2p_view: %{ availabilities: <<600::16, 0::16, 600::16>>} ...> }, - ...> %Slot{ p2p_view: %{availabilities: <<0::1, 1::1, 1::1>>}, slot_time: ~U[2020-06-25 15:11:50Z] }, - ...> %Slot{ p2p_view: %{availabilities: <<0::1, 1::1, 1::1>>}, slot_time: ~U[2020-06-25 15:11:50Z] }, - ...> %Slot{ p2p_view: %{availabilities: <<1::1, 1::1, 0::1>>}, slot_time: ~U[2020-06-25 15:11:40Z] }, - ...> %Slot{ p2p_view: %{availabilities: <<1::1, 0::1, 0::1>>}, slot_time: ~U[2020-06-25 15:11:40Z] }, - ...> %Slot{ p2p_view: %{availabilities: <<1::1, 1::1, 0::1>>}, slot_time: ~U[2020-06-25 15:11:40Z] }, - ...> %Slot{ p2p_view: %{availabilities: <<1::1, 1::1, 0::1>>}, slot_time: ~U[2020-06-25 15:11:30Z] }, - ...> %Slot{ p2p_view: %{availabilities: <<0::1, 1::1, 1::1>>}, slot_time: ~U[2020-06-25 15:11:30Z] }, - ...> %Slot{ p2p_view: %{availabilities: <<1::1, 1::1, 0::1>>}, slot_time: ~U[2020-06-25 15:11:30Z] } + ...> %Slot{ p2p_view: %{availabilities: <<600::16, 100::16, 600::16>>}, slot_time: ~U[2020-06-25 15:11:50Z] }, + ...> %Slot{ p2p_view: %{availabilities: <<600::16, 100::16, 600::16>>}, slot_time: ~U[2020-06-25 15:11:50Z] }, + ...> %Slot{ p2p_view: %{availabilities: <<600::16, 200::16, 470::16>>}, slot_time: ~U[2020-06-25 15:11:40Z] }, + ...> %Slot{ p2p_view: %{availabilities: <<600::16, 500::16, 0::16>>}, slot_time: ~U[2020-06-25 15:11:40Z] }, + ...> %Slot{ p2p_view: %{availabilities: <<0::16, 600::16, 300::16>>}, slot_time: ~U[2020-06-25 15:11:40Z] }, + ...> %Slot{ p2p_view: %{availabilities: <<600::16, 600::16, 300::16>>}, slot_time: ~U[2020-06-25 15:11:30Z] }, + ...> %Slot{ p2p_view: %{availabilities: <<600::16, 600::16, 300::16>>}, slot_time: ~U[2020-06-25 15:11:30Z] }, + ...> %Slot{ p2p_view: %{availabilities: <<0::16, 600::16, 300::16>>}, slot_time: ~U[2020-06-25 15:11:30Z] } ...> ], [ ...> %Node{first_public_key: "key1", enrollment_date: ~U[2020-06-25 15:11:00Z]}, ...> %Node{first_public_key: "key2", enrollment_date: ~U[2020-06-25 15:11:00Z]}, @@ -113,13 +116,15 @@ defmodule Archethic.BeaconChain.Summary do } } ], - node_availabilities: <<1::1, 1::1, 1::1>>, - node_average_availabilities: [0.7, 0.7, 0.50] + node_availabilities: <<1::1, 0::1, 1::1>>, + node_average_availabilities: [1.0, 0.4601449275362319, 0.7717391304347827] } ### Aggregate the P2P view and the transaction attestations with new node joining during the beacon chain epoch - iex> Summary.aggregate_slots(%Summary{}, [ + iex> :ets.new(:archethic_slot_timer, [:named_table, :public, read_concurrency: true]) + ...> :ets.insert(:archethic_slot_timer, {:interval, "0 */10 * * * * *"}) + ...> Summary.aggregate_slots(%Summary{}, [ ...> %Slot{ ...> slot_time: ~U[2020-06-25 15:12:00Z], ...> transaction_attestations: [ @@ -133,7 +138,7 @@ defmodule Archethic.BeaconChain.Summary do ...> } ...> } ...> ], - ...> p2p_view: %{ availabilities: <<1::1, 0::1, 1::1, 1::1>>} + ...> p2p_view: %{ availabilities: <<0::16, 0::16, 600::16, 600::16>>} ...> }, ...> %Slot{ ...> slot_time: ~U[2020-06-25 15:12:00Z], @@ -148,14 +153,14 @@ defmodule Archethic.BeaconChain.Summary do ...> } ...> } ...> ], - ...> p2p_view: %{ availabilities: <<1::1, 0::1, 1::1, 1::1>>} + ...> p2p_view: %{ availabilities: <<0::16, 0::16, 600::16, 600::16>>} ...> }, - ...> %Slot{ p2p_view: %{availabilities: <<0::1, 1::1, 1::1, 1::1>>}, slot_time: ~U[2020-06-25 15:11:50Z] }, - ...> %Slot{ p2p_view: %{availabilities: <<0::1, 0::1, 1::1, 1::1>>}, slot_time: ~U[2020-06-25 15:11:50Z] }, - ...> %Slot{ p2p_view: %{availabilities: <<0::1, 0::1, 1::1, 1::1>>}, slot_time: ~U[2020-06-25 15:11:50Z] }, - ...> %Slot{ p2p_view: %{availabilities: <<1::1, 1::1, 0::1>>}, slot_time: ~U[2020-06-25 15:11:40Z] }, - ...> %Slot{ p2p_view: %{availabilities: <<1::1, 0::1, 0::1>>}, slot_time: ~U[2020-06-25 15:11:40Z] }, - ...> %Slot{ p2p_view: %{availabilities: <<1::1, 1::1, 0::1>>}, slot_time: ~U[2020-06-25 15:11:30Z] } + ...> %Slot{ p2p_view: %{availabilities: <<200::16, 300::16, 600::16, 600::16>>}, slot_time: ~U[2020-06-25 15:11:50Z] }, + ...> %Slot{ p2p_view: %{availabilities: <<200::16, 200::16, 600::16, 600::16>>}, slot_time: ~U[2020-06-25 15:11:50Z] }, + ...> %Slot{ p2p_view: %{availabilities: <<600::16, 200::16, 600::16, 600::16>>}, slot_time: ~U[2020-06-25 15:11:50Z] }, + ...> %Slot{ p2p_view: %{availabilities: <<600::16, 450::16, 200::16>>}, slot_time: ~U[2020-06-25 15:11:40Z] }, + ...> %Slot{ p2p_view: %{availabilities: <<600::16, 450::16, 200::16>>}, slot_time: ~U[2020-06-25 15:11:40Z] }, + ...> %Slot{ p2p_view: %{availabilities: <<600::16, 600::16, 0::16>>}, slot_time: ~U[2020-06-25 15:11:30Z] } ...> ], [ ...> %Node{first_public_key: "key1", enrollment_date: ~U[2020-06-25 15:11:00Z]}, ...> %Node{first_public_key: "key2", enrollment_date: ~U[2020-06-25 15:11:00Z]}, @@ -175,7 +180,7 @@ defmodule Archethic.BeaconChain.Summary do } ], node_availabilities: <<1::1, 0::1, 1::1, 1::1>>, - node_average_availabilities: [0.625, 0.375, 0.625, 1.0] + node_average_availabilities: [0.5434782608695653, 0.48369565217391314, 0.6231884057971014, 1.0] } """ @spec aggregate_slots( @@ -256,20 +261,25 @@ defmodule Archethic.BeaconChain.Summary do end defp reduce_slot_availabilities( - %Slot{slot_time: slot_time, p2p_view: %{availabilities: availabilities}}, + %Slot{slot_time: slot_time, p2p_view: %{availabilities: availabilities_bin}}, acc, node_list ) do node_list_subset_time = node_list_at_slot_time(node_list, slot_time) + availabilities = for <>, do: availability_time + availabilities - |> Utils.bitstring_to_integer_list() |> Enum.with_index() - |> Enum.reduce(acc, fn {availability, i}, acc -> + |> Enum.reduce(acc, fn {availability_time, i}, acc -> node = Enum.at(node_list_subset_time, i) node_pos = Enum.find_index(node_list, &(&1.first_public_key == node.first_public_key)) - Map.update(acc, node_pos, [availability], &[availability | &1]) + availability_by_slot = + Map.get(acc, node_pos, %{}) + |> Map.update(slot_time, [availability_time], &[availability_time | &1]) + + Map.put(acc, node_pos, availability_by_slot) end) end @@ -287,12 +297,33 @@ defmodule Archethic.BeaconChain.Summary do {node_index, availabilities}, acc ) do - frequencies = Enum.frequencies(availabilities) - online_frequencies = Map.get(frequencies, 1, 0) - offline_frequencies = Map.get(frequencies, 0, 0) + # First, do a median for each slot + # Then do a wheighted mean of the result + map = + availabilities + |> Map.values() + |> Enum.map(&Utils.median(&1)) + |> Enum.with_index() + |> Enum.reduce(%{}, fn {slot_availability_time, slot_index}, acc -> + # 1,0 -> 1,1 -> 1,2 ... + # Weight for 144th slot = 15.4 + weight = 1 + slot_index / 10 + weighted_availability_time = slot_availability_time * weight + + acc + |> Map.update( + :total_availability_time, + weighted_availability_time, + &(&1 + weighted_availability_time) + ) + |> Map.update(:total_weight, weight, &(&1 + weight)) + end) - available? = online_frequencies >= offline_frequencies - avg_availability = online_frequencies / length(availabilities) + availability_time = Map.get(map, :total_availability_time) / Map.get(map, :total_weight) + avg_availability = availability_time / SlotTimer.get_time_interval() + # TODO We may change the value where the node is considered as available + # to get only stable nodes like avg_availability > 0.85 + available? = avg_availability > 0.5 acc |> Map.update!(:availabilities, fn bitstring -> diff --git a/lib/archethic/p2p/client.ex b/lib/archethic/p2p/client.ex index beddd1e49..53c5f7451 100644 --- a/lib/archethic/p2p/client.ex +++ b/lib/archethic/p2p/client.ex @@ -22,4 +22,6 @@ defmodule Archethic.P2P.Client do {:ok, Message.response()} | {:error, :timeout} | {:error, :closed} + + @callback get_availability_timer(Crypto.key(), boolean()) :: non_neg_integer() end diff --git a/lib/archethic/p2p/client/connection.ex b/lib/archethic/p2p/client/connection.ex index dc990e8f1..7764bb856 100644 --- a/lib/archethic/p2p/client/connection.ex +++ b/lib/archethic/p2p/client/connection.ex @@ -48,6 +48,22 @@ defmodule Archethic.P2P.Client.Connection do end end + @doc """ + Get the availability timer and reset it with a new start time if it was already started + """ + @spec get_availability_timer(Crypto.key(), boolean()) :: non_neg_integer() + def get_availability_timer(public_key, reset?) do + GenStateMachine.call(via_tuple(public_key), {:get_timer, reset?}) + end + + @doc """ + Start the availability timer if it wasn't + """ + @spec start_availability_timer(Crypto.key()) :: :ok + def start_availability_timer(public_key) do + GenStateMachine.cast(via_tuple(public_key), :start_timer) + end + # fetch cnnoection details from registery for a node from its public key defp via_tuple(public_key), do: {:via, Registry, {ConnectionRegistry, public_key}} @@ -64,13 +80,53 @@ defmodule Archethic.P2P.Client.Connection do transport: transport, request_id: 0, messages: %{}, - send_tasks: %{} + send_tasks: %{}, + availability_timer: {nil, 0} } actions = [{:next_event, :internal, :connect}] {:ok, :disconnected, data, actions} end + def handle_event( + {:call, from}, + {:get_timer, reset?}, + _state, + data = %{availability_timer: availability_timer} + ) do + time = + case availability_timer do + {nil, time} -> + time + + {start, time} -> + time + (System.monotonic_time(:second) - start) + end + + if reset? do + new_data = + Map.update!(data, :availability_timer, fn + {nil, _} -> + {nil, 0} + + _ -> + {System.monotonic_time(:second), 0} + end) + + {:keep_state, new_data, {:reply, from, time}} + else + {:keep_state_and_data, {:reply, from, time}} + end + end + + def handle_event(:cast, :start_timer, _state, data) do + {:keep_state, + Map.update!(data, :availability_timer, fn + {nil, time} -> {System.monotonic_time(:second), time} + timer -> timer + end)} + end + def handle_event(:enter, :disconnected, :disconnected, _data), do: :keep_state_and_data def handle_event( @@ -83,6 +139,19 @@ defmodule Archethic.P2P.Client.Connection do MemTable.decrease_node_availability(node_public_key) + # Stop availability timer + new_data = + data + |> Map.put(:messages, %{}) + |> Map.update!(:availability_timer, fn + {nil, time} -> + {nil, time} + + {start, previous_time} -> + added_time = System.monotonic_time(:second) - start + {nil, previous_time + added_time} + end) + # Notify clients the connection is lost # and cancel the existing timeouts actions = @@ -93,7 +162,21 @@ defmodule Archethic.P2P.Client.Connection do # Reconnect with backoff actions = [{{:timeout, :reconnect}, 500, nil} | actions] - {:keep_state, %{data | messages: %{}}, actions} + {:keep_state, new_data, actions} + end + + def handle_event(:enter, :disconnected, {:connected, _socket}, data) do + # Start availability timer + new_data = + Map.update!(data, :availability_timer, fn + {nil, time} -> + {System.monotonic_time(:second), time} + + timer -> + timer + end) + + {:keep_state, new_data} end def handle_event(:enter, _old_state, {:connected, _socket}, _data), do: :keep_state_and_data @@ -127,35 +210,6 @@ defmodule Archethic.P2P.Client.Connection do :keep_state_and_data end - def handle_event( - :cast, - {:connect, from}, - state, - data = %{ - ip: ip, - port: port, - transport: transport - } - ) do - next_state = - case state do - :disconnected -> - case transport.handle_connect(ip, port) do - {:ok, socket} -> - {:next_state, {:connected, socket}, data} - - {:error, _reason} -> - :keep_state_and_data - end - - _ -> - :keep_state_and_data - end - - send(from, :ok) - next_state - end - def handle_event( :cast, {:send_message, ref, from, _msg, _timeout}, @@ -247,6 +301,19 @@ defmodule Archethic.P2P.Client.Connection do message_id: msg_id ) + MemTable.decrease_node_availability(node_public_key) + + # Stop availability timer + new_data = + Map.update!(new_data, :availability_timer, fn + {nil, time} -> + {nil, time} + + {start, previous_time} -> + added_time = System.monotonic_time(:second) - start + {nil, previous_time + added_time} + end) + {:keep_state, new_data} {nil, _} -> @@ -320,6 +387,16 @@ defmodule Archethic.P2P.Client.Connection do {:ok, msg} -> MemTable.increase_node_availability(node_public_key) + # Start availability timer + new_data = + Map.update!(data, :availability_timer, fn + {nil, time} -> + {System.monotonic_time(:second), time} + + {start, time} -> + {start, time} + end) + start_decoding_time = System.monotonic_time() %MessageEnvelop{ @@ -337,7 +414,7 @@ defmodule Archethic.P2P.Client.Connection do end_time = System.monotonic_time() - case pop_in(data, [:messages, message_id]) do + case pop_in(new_data, [:messages, message_id]) do {%{ from: from, ref: ref, diff --git a/lib/archethic/p2p/client/default_impl.ex b/lib/archethic/p2p/client/default_impl.ex index d515b77df..981a13df7 100644 --- a/lib/archethic/p2p/client/default_impl.ex +++ b/lib/archethic/p2p/client/default_impl.ex @@ -67,4 +67,7 @@ defmodule Archethic.P2P.Client.DefaultImpl do end end end + + @impl Client + defdelegate get_availability_timer(public_key, reset?), to: Connection end diff --git a/lib/archethic/p2p/listener_protocol.ex b/lib/archethic/p2p/listener_protocol.ex index 3251ddd8b..b05aa4e51 100644 --- a/lib/archethic/p2p/listener_protocol.ex +++ b/lib/archethic/p2p/listener_protocol.ex @@ -52,6 +52,7 @@ defmodule Archethic.P2P.ListenerProtocol do ) Archethic.P2P.MemTable.increase_node_availability(sender_public_key) + Archethic.P2P.Client.Connection.start_availability_timer(sender_public_key) start_processing_time = System.monotonic_time() response = Archethic.P2P.Message.process(message) diff --git a/lib/archethic/utils.ex b/lib/archethic/utils.ex index 6b7ab2d3b..7e7549ff6 100644 --- a/lib/archethic/utils.ex +++ b/lib/archethic/utils.ex @@ -693,4 +693,39 @@ defmodule Archethic.Utils do end) end) end + + @doc """ + Get the median value from a list. + ## Examples + iex> Utils.median([]) + nil + iex> Utils.median([1,2,3]) + 2 + iex> Utils.median([1,2,3,4]) + 2.5 + """ + @spec median([number]) :: number | nil + def median([]), do: nil + + def median(list) when is_list(list) do + midpoint = + (length(list) / 2) + |> Float.floor() + |> round + + {l1, l2} = + Enum.sort(list) + |> Enum.split(midpoint) + + case length(l2) > length(l1) do + true -> + [med | _] = l2 + med + + false -> + [m1 | _] = l2 + [m2 | _] = Enum.reverse(l1) + (m1 + m2) / 2 + end + end end diff --git a/test/archethic/beacon_chain/slot_timer_test.exs b/test/archethic/beacon_chain/slot_timer_test.exs index c39290cf2..1d0c8bb00 100644 --- a/test/archethic/beacon_chain/slot_timer_test.exs +++ b/test/archethic/beacon_chain/slot_timer_test.exs @@ -117,6 +117,12 @@ defmodule Archethic.BeaconChain.SlotTimerTest do assert :gt == DateTime.compare(now, previous_slot_time) end + test "get_time_interval should return interval in second" do + SlotTimer.start_link([interval: "0 */10 * * * * *"], []) + + assert 600 == SlotTimer.get_time_interval() + end + describe "SlotTimer Behavior During start" do test "should wait for node :up message" do :persistent_term.put(:archethic_up, nil) diff --git a/test/archethic/beacon_chain/subset/p2p_sampling_test.exs b/test/archethic/beacon_chain/subset/p2p_sampling_test.exs index 6d13ae880..b03542966 100644 --- a/test/archethic/beacon_chain/subset/p2p_sampling_test.exs +++ b/test/archethic/beacon_chain/subset/p2p_sampling_test.exs @@ -37,7 +37,7 @@ defmodule Archethic.BeaconChain.Subset.P2PSamplingTest do assert [%Node{port: 3004}] = P2PSampling.list_nodes_to_sample(<<1>>) end - test "get_p2p_views/1 fetch p2p node availability and latency for the given list of nodes" do + test "get_p2p_views/2 fetch p2p node availability and latency for the given list of nodes" do nodes = [ %Node{ip: {127, 0, 0, 1}, port: 3001, first_public_key: "key1"}, %Node{ip: {127, 0, 0, 1}, port: 3002, first_public_key: "key2"}, @@ -45,6 +45,8 @@ defmodule Archethic.BeaconChain.Subset.P2PSamplingTest do %Node{ip: {127, 0, 0, 1}, port: 3004, first_public_key: "key4"} ] + node_availability_time = [600, 500, 365, 0] + MockClient |> stub(:new_connection, fn _, _, _, _ -> {:ok, self()} @@ -67,8 +69,8 @@ defmodule Archethic.BeaconChain.Subset.P2PSamplingTest do Enum.each(nodes, &P2P.add_and_connect_node/1) - assert [{true, node1_lat}, {true, node2_lat}, {true, node3_lat}, {false, 1000}] = - P2PSampling.get_p2p_views(nodes) + assert [{600, node1_lat}, {500, node2_lat}, {365, node3_lat}, {0, 0}] = + P2PSampling.get_p2p_views(nodes, node_availability_time) assert node1_lat < node2_lat assert node2_lat < node3_lat diff --git a/test/archethic/beacon_chain/subset/summary_cache_test.exs b/test/archethic/beacon_chain/subset/summary_cache_test.exs index 655c941e9..3696c2b3d 100644 --- a/test/archethic/beacon_chain/subset/summary_cache_test.exs +++ b/test/archethic/beacon_chain/subset/summary_cache_test.exs @@ -53,7 +53,7 @@ defmodule Archethic.BeaconChain.Subset.SummaryCacheTest do } ], p2p_view: %{ - availabilities: <<1::1, 0::1>>, + availabilities: <<600::16, 0::16>>, network_stats: [ %{latency: 10}, %{latency: 0} diff --git a/test/archethic/beacon_chain/subset_test.exs b/test/archethic/beacon_chain/subset_test.exs index 1ba0efd0d..a34d649e3 100644 --- a/test/archethic/beacon_chain/subset_test.exs +++ b/test/archethic/beacon_chain/subset_test.exs @@ -247,6 +247,19 @@ defmodule Archethic.BeaconChain.SubsetTest do test "new summary is created when the slot time is the summary time", %{ subset: subset } do + MockClient + |> stub(:send_message, fn + _, %Ping{}, _ -> + Process.sleep(10) + {:ok, %Ok{}} + + _, %NewBeaconSlot{}, _ -> + {:ok, %Ok{}} + end) + + MockClient + |> stub(:get_availability_timer, fn _, _ -> 0 end) + summary_interval = "*/5 * * * *" start_supervised!({SummaryTimer, interval: summary_interval}) start_supervised!({SlotTimer, interval: "0 0 * * *"}) @@ -298,16 +311,6 @@ defmodule Archethic.BeaconChain.SubsetTest do fee: 0 } - MockClient - |> stub(:send_message, fn - _, %Ping{}, _ -> - Process.sleep(10) - {:ok, %Ok{}} - - _, %NewBeaconSlot{}, _ -> - {:ok, %Ok{}} - end) - send( pid, {:new_replication_attestation, %ReplicationAttestation{transaction_summary: tx_summary}} diff --git a/test/archethic/p2p/client/connection_test.exs b/test/archethic/p2p/client/connection_test.exs index c97e270f4..817a3198f 100644 --- a/test/archethic/p2p/client/connection_test.exs +++ b/test/archethic/p2p/client/connection_test.exs @@ -224,6 +224,134 @@ defmodule Archethic.P2P.Client.ConnectionTest do end end + describe "availability_timer" do + test "should start when node connect" do + {:ok, pid} = + Connection.start_link( + transport: __MODULE__.MockTransport, + ip: {127, 0, 0, 1}, + port: 3000, + node_public_key: "key1" + ) + + assert {{:connected, _socket}, %{availability_timer: {start, 0}}} = :sys.get_state(pid) + assert start != nil + end + + test "should stop and calculate time when the timeout is reached" do + {:ok, pid} = + Connection.start_link( + transport: __MODULE__.MockTransport, + ip: {127, 0, 0, 1}, + port: 3000, + node_public_key: Crypto.first_node_public_key() + ) + + assert {:error, :timeout} = + Connection.send_message( + Crypto.first_node_public_key(), + %GetBalance{address: <<0::8, :crypto.strong_rand_bytes(32)::binary>>}, + 1000 + ) + + Process.sleep(10) + + assert {_, %{availability_timer: {nil, 1}}} = :sys.get_state(pid) + + :ok = Connection.start_availability_timer(Crypto.first_node_public_key()) + + assert {_, %{availability_timer: {start, 1}}} = :sys.get_state(pid) + assert start != nil + + assert {:error, :timeout} = + Connection.send_message( + Crypto.first_node_public_key(), + %GetBalance{address: <<0::8, :crypto.strong_rand_bytes(32)::binary>>}, + 1000 + ) + + Process.sleep(10) + + assert {_, %{availability_timer: {nil, 2}}} = :sys.get_state(pid) + end + + test "should stop when node disconnect" do + defmodule MockTransportDisconnected3 do + alias Archethic.P2P.Client.Transport + + @behaviour Transport + + def handle_connect(_ip, _port) do + {:ok, make_ref()} + end + + def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok + + def handle_message({_, _, _}), do: {:error, :closed} + end + + {:ok, pid} = + Connection.start_link( + transport: __MODULE__.MockTransportDisconnected3, + ip: {127, 0, 0, 1}, + port: 3000, + node_public_key: Crypto.first_node_public_key() + ) + + assert {{:connected, _socket}, %{availability_timer: {start, 0}}} = :sys.get_state(pid) + assert start != nil + + msg_envelop = + %MessageEnvelop{ + message: %Balance{}, + message_id: 0, + sender_public_key: Crypto.first_node_public_key() + } + |> MessageEnvelop.encode(Crypto.first_node_public_key()) + + send(pid, {__MODULE__.MockTransportDisconnected, make_ref(), msg_envelop}) + + Process.sleep(10) + + assert {_, %{availability_timer: {nil, 0}}} = :sys.get_state(pid) + end + end + + describe "get_availability_timer" do + test "should return time value and reset timer" do + {:ok, pid} = + Connection.start_link( + transport: __MODULE__.MockTransport, + ip: {127, 0, 0, 1}, + port: 3000, + node_public_key: Crypto.first_node_public_key() + ) + + assert {:error, :timeout} = + Connection.send_message( + Crypto.first_node_public_key(), + %GetBalance{address: <<0::8, :crypto.strong_rand_bytes(32)::binary>>}, + 1000 + ) + + Process.sleep(10) + + assert {_, %{availability_timer: {nil, 1}}} = :sys.get_state(pid) + + assert 1 == Connection.get_availability_timer(Crypto.first_node_public_key(), true) + + assert {_, %{availability_timer: {nil, 0}}} = :sys.get_state(pid) + + :ok = Connection.start_availability_timer(Crypto.first_node_public_key()) + + assert {_, %{availability_timer: {start, 0}}} = :sys.get_state(pid) + + assert 0 == Connection.get_availability_timer(Crypto.first_node_public_key(), false) + + assert {_, %{availability_timer: {^start, 0}}} = :sys.get_state(pid) + end + end + defmodule MockTransport do alias Archethic.P2P.Client.Transport