Skip to content

Commit

Permalink
Set network coordinates from BeaconChain (#927)
Browse files Browse the repository at this point in the history
* Add the node origin of the slot in the summary cache

* Aggregate stats with weighed logistic regression

* Add fetch remote network stats

* Integrate network patches in the beacon summary

* Integrate network patches in summary aggregate

* Integrate network patch in the p2p summary dump

* Sort list for weight application

* Clean summary cache in the next 1st beacon slot

* Optimize network stats fetching

* Better cleaning of the previous summary slots

* lint

* Add code_change

* Fix summary patch serialization when starts by 0

* Fix beacon chain test

* Fallback code_change

* Aggregate mutliple stats

* Add stats collector to cache the parallel requests

* Simplify the network patch serialization

* Fix encoding/decoding of network patch digit

* Resolve missing beacon network stats

* Reduce debug trace

* Lint

* Create slot event only for authorized nodes

* Lint

* Use median in aggregation of conflicting patches

* Fix P2P edge case on bootstrap

* Fix serialize nb of subsets

* Set before? flag to true for all beacon election

* Fix Credo

* Fix format

* Add PubSub register in code change

---------

Co-authored-by: Neylix <julien.leclerc05@protonmail.com>
Co-authored-by: Neylix <julien@uniris.io>
  • Loading branch information
3 people committed Apr 11, 2023
1 parent a85be12 commit 9d8b4a8
Show file tree
Hide file tree
Showing 31 changed files with 1,773 additions and 251 deletions.
31 changes: 25 additions & 6 deletions lib/archethic/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule Archethic.BeaconChain do
alias __MODULE__.Slot.Validation, as: SlotValidation
alias __MODULE__.SlotTimer
alias __MODULE__.Subset
alias __MODULE__.NetworkCoordinates
alias __MODULE__.Subset.P2PSampling
alias __MODULE__.Subset.SummaryCache
alias __MODULE__.Summary
Expand All @@ -28,6 +29,7 @@ defmodule Archethic.BeaconChain do
alias Archethic.P2P.Message.BeaconSummaryList
alias Archethic.P2P.Message.NotFound
alias Archethic.P2P.Message.TransactionSummaryList

alias Archethic.TaskSupervisor

alias Archethic.TransactionChain.TransactionSummary
Expand Down Expand Up @@ -110,8 +112,8 @@ defmodule Archethic.BeaconChain do
@doc """
Load a slot in summary cache
"""
@spec load_slot(Slot.t()) :: :ok | :error
def load_slot(slot = %Slot{subset: subset, slot_time: slot_time}) do
@spec load_slot(Slot.t(), Crypto.key()) :: :ok | :error
def load_slot(slot = %Slot{subset: subset, slot_time: slot_time}, node_public_key) do
if slot_time == SlotTimer.previous_slot(DateTime.utc_now()) do
Task.Supervisor.start_child(TaskSupervisor, fn ->
case validate_slot(slot) do
Expand All @@ -120,7 +122,7 @@ defmodule Archethic.BeaconChain do
beacon_subset: Base.encode16(subset)
)

SummaryCache.add_slot(subset, slot)
SummaryCache.add_slot(subset, slot, node_public_key)

{:error, reason} ->
Logger.error("Invalid beacon slot - #{inspect(reason)}")
Expand Down Expand Up @@ -195,6 +197,10 @@ defmodule Archethic.BeaconChain do
@spec get_summary_slots(binary()) :: list(TransactionSummary.t())
def get_summary_slots(subset) when is_binary(subset) do
SummaryCache.stream_current_slots(subset)
|> Stream.map(fn
{slot, _} -> slot
slot -> slot
end)
|> Stream.flat_map(fn %Slot{transaction_attestations: transaction_attestations} ->
transaction_summaries =
transaction_attestations
Expand Down Expand Up @@ -252,7 +258,12 @@ defmodule Archethic.BeaconChain do
if unsubscribe?, do: Update.unsubscribe()

Enum.map(list_subsets(), fn subset ->
nodes = Election.beacon_storage_nodes(subset, date, P2P.authorized_and_available_nodes())
nodes =
Election.beacon_storage_nodes(
subset,
date,
P2P.authorized_and_available_nodes(date, true)
)

nodes =
Enum.reject(nodes, fn node -> node.first_public_key == Crypto.first_node_public_key() end)
Expand Down Expand Up @@ -324,10 +335,10 @@ defmodule Archethic.BeaconChain do
@spec list_transactions_summaries_from_current_slot(DateTime.t()) ::
list(TransactionSummary.t())
def list_transactions_summaries_from_current_slot(date = %DateTime{} \\ DateTime.utc_now()) do
authorized_nodes = P2P.authorized_and_available_nodes()

next_summary_date = next_summary_date(DateTime.truncate(date, :millisecond))

authorized_nodes = P2P.authorized_and_available_nodes(next_summary_date, true)

# get the subsets to request per node
list_subsets()
|> Enum.map(fn subset ->
Expand Down Expand Up @@ -472,4 +483,12 @@ defmodule Archethic.BeaconChain do
e
end
end

@doc """
Retrieve the network stats for a given subset from the cached slots
"""
@spec get_network_stats(binary()) :: %{Crypto.key() => Slot.net_stats()}
def get_network_stats(subset) when is_binary(subset) do
NetworkCoordinates.aggregate_network_stats(subset)
end
end
Loading

0 comments on commit 9d8b4a8

Please sign in to comment.