Skip to content

Commit

Permalink
Improve self-repair summary downloads (#400)
Browse files Browse the repository at this point in the history
* Group summaries fetch by node
* Stream beacon summaries
  • Loading branch information
Samuel committed Jun 27, 2022
1 parent 8670586 commit a453377
Show file tree
Hide file tree
Showing 11 changed files with 863 additions and 956 deletions.
114 changes: 104 additions & 10 deletions lib/archethic/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,25 @@ defmodule Archethic.BeaconChain do
to retrieve the beacon storage nodes involved.
"""

alias Archethic.BeaconChain.Slot
alias Archethic.BeaconChain.Slot.EndOfNodeSync
alias Archethic.BeaconChain.Slot.Validation, as: SlotValidation

alias Archethic.BeaconChain.SlotTimer
alias Archethic.BeaconChain.Subset
alias Archethic.BeaconChain.Subset.P2PSampling
alias Archethic.BeaconChain.Subset.SummaryCache
alias Archethic.BeaconChain.Summary
alias Archethic.BeaconChain.SummaryTimer
alias __MODULE__.Slot
alias __MODULE__.Slot.EndOfNodeSync
alias __MODULE__.Slot.Validation, as: SlotValidation
alias __MODULE__.SlotTimer
alias __MODULE__.Subset
alias __MODULE__.Subset.P2PSampling
alias __MODULE__.Subset.SummaryCache
alias __MODULE__.Summary
alias __MODULE__.SummaryAggregate
alias __MODULE__.SummaryTimer

alias Archethic.Crypto

alias Archethic.Election

alias Archethic.P2P
alias Archethic.P2P.Node
alias Archethic.P2P.Message.GetBeaconSummaries
alias Archethic.P2P.Message.BeaconSummaryList
alias Archethic.P2P.Message.RegisterBeaconUpdates

alias Archethic.TaskSupervisor
Expand Down Expand Up @@ -259,4 +261,96 @@ defmodule Archethic.BeaconChain do
})
end)
end

@doc """
Request from the beacon chains all the summaries for the given dates and aggregate them
```
[0, 1, ...] Subsets
/ | \
/ | \
[ ] [ ] [ ] Node election for each dates to sync
|\ /|\ /|
| \/ | \/ |
| /\ | /\ |
[ ] [ ] [ ] Partition by node
| | |
[ ] [ ] [ ] Aggregate addresses
| | |
[ ] [ ] [ ] Fetch summaries
|\ /|\ /|
| \/ | \/ |
| /\ | /\ |
[D1] [D2] [D3] Partition by date
| | |
[ ] [ ] [ ] Aggregate and consolidate summaries
\ | /
\ | /
\ | /
\ | /
[ ]
```
"""
@spec fetch_summary_aggregates(list(DateTime.t()) | Enumerable.t()) ::
list(SummaryAggregate.t())
def fetch_summary_aggregates(dates) do
authorized_nodes = P2P.authorized_and_available_nodes()

list_subsets()
|> Flow.from_enumerable()
|> Flow.flat_map(fn subset ->
# Foreach subset and date we compute concurrently the node election
dates
|> Stream.map(&get_summary_address_by_node(&1, subset, authorized_nodes))
|> Enum.flat_map(& &1)
end)
# We partition by node
|> Flow.partition(key: {:elem, 0})
|> Flow.reduce(fn -> %{} end, fn {node, summary_address}, acc ->
# We aggregate the addresses for a given node
Map.update(acc, node, [summary_address], &[summary_address | &1])
end)
|> Flow.flat_map(fn {node, addresses} ->
# For this node we fetch the summaries
fetch_summaries(node, addresses)
end)
# We repartition by summary time to aggregate summaries for a date
|> Flow.partition(key: {:key, :summary_time})
|> Flow.reduce(
fn -> %SummaryAggregate{} end,
&SummaryAggregate.add_summary(&2, &1)
)
|> Flow.emit(:state)
|> Stream.reject(&SummaryAggregate.empty?/1)
|> Stream.map(&SummaryAggregate.aggregate/1)
|> Enum.sort_by(& &1.summary_time)
end

defp get_summary_address_by_node(date, subset, authorized_nodes) do
filter_nodes =
Enum.filter(authorized_nodes, &(DateTime.compare(&1.authorization_date, date) == :lt))

summary_address = Crypto.derive_beacon_chain_address(subset, date, true)

subset
|> Election.beacon_storage_nodes(date, filter_nodes)
|> Enum.map(fn node ->
{node, summary_address}
end)
end

defp fetch_summaries(node, addresses) do
addresses
|> Stream.chunk_every(10)
|> Stream.flat_map(fn addresses ->
case P2P.send_message(node, %GetBeaconSummaries{addresses: addresses}) do
{:ok, %BeaconSummaryList{summaries: summaries}} ->
summaries

_ ->
[]
end
end)
|> Enum.to_list()
end
end
157 changes: 157 additions & 0 deletions lib/archethic/beacon_chain/summary_aggregate.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
defmodule Archethic.BeaconChain.SummaryAggregate do
@moduledoc """
Represents an aggregate of multiple beacon summary from multiple subsets for a given date
This will help the self-sepair to maintain an aggregated and ordered view of items to synchronize and to resolve
"""

defstruct [:summary_time, transaction_summaries: [], p2p_availabilities: %{}]

alias Archethic.Crypto

alias Archethic.BeaconChain.ReplicationAttestation
alias Archethic.BeaconChain.Summary, as: BeaconSummary

alias Archethic.TransactionChain.TransactionSummary

alias Archethic.Utils

@type t :: %__MODULE__{
summary_time: DateTime.t(),
transaction_summaries: list(TransactionSummary.t()),
p2p_availabilities: %{
(subset :: binary()) => %{
node_availabilities: bitstring(),
node_average_availabilities: list(float()),
end_of_node_synchronizations: list(Crypto.key())
}
}
}

@doc """
Aggregate a new BeaconChain's summary
"""
@spec add_summary(t(), BeaconSummary.t()) :: t()
def add_summary(
agg = %__MODULE__{},
%BeaconSummary{
subset: subset,
summary_time: summary_time,
transaction_attestations: transaction_attestations,
node_availabilities: node_availabilities,
node_average_availabilities: node_average_availabilities,
end_of_node_synchronizations: end_of_node_synchronizations
}
) do
valid_attestations? =
Enum.all?(transaction_attestations, fn attestation ->
ReplicationAttestation.validate(attestation) == :ok
end)

if valid_attestations? do
agg
|> Map.update!(
:transaction_summaries,
fn prev ->
transaction_attestations
|> Enum.map(& &1.transaction_summary)
|> Enum.concat(prev)
end
)
|> update_in(
[
Access.key(:p2p_availabilities, %{}),
Access.key(subset, %{
node_availabilities: [],
node_average_availabilities: [],
end_of_node_synchronizations: []
})
],
fn prev ->
prev
|> Map.update!(
:node_availabilities,
&Enum.concat(&1, [Utils.bitstring_to_integer_list(node_availabilities)])
)
|> Map.update!(
:node_average_availabilities,
&Enum.concat(&1, [node_average_availabilities])
)
|> Map.update!(
:end_of_node_synchronizations,
&Enum.concat(&1, end_of_node_synchronizations)
)
end
)
|> Map.update(:summary_time, summary_time, fn
nil -> summary_time
prev -> prev
end)
else
agg
end
end

@doc """
Aggregate summaries batch
"""
@spec aggregate(t()) :: t()
def aggregate(agg = %__MODULE__{}) do
agg
|> Map.update!(:transaction_summaries, fn transactions ->
transactions
|> Enum.uniq_by(& &1.address)
|> Enum.sort_by(& &1.timestamp, {:asc, DateTime})
end)
|> Map.update!(:p2p_availabilities, fn availabilities_by_subject ->
availabilities_by_subject
|> Enum.map(fn {subset, data} ->
{subset,
data
|> Map.update!(:node_availabilities, &aggregate_node_availabilities/1)
|> Map.update!(:node_average_availabilities, &aggregate_node_average_availabilities/1)}
end)
|> Enum.into(%{})
end)
end

defp aggregate_node_availabilities(node_availabilities) do
node_availabilities
|> Enum.zip()
|> Enum.map(&Tuple.to_list/1)
|> Enum.map(fn availabilities ->
# Get the mode of the availabilities
frequencies = Enum.frequencies(availabilities)
online_frequencies = Map.get(frequencies, 1, 0)
offline_frequencies = Map.get(frequencies, 0, 0)

if online_frequencies >= offline_frequencies do
1
else
0
end
end)
|> List.flatten()
|> Enum.map(&<<&1::1>>)
|> :erlang.list_to_bitstring()
end

defp aggregate_node_average_availabilities(avg_availabilities) do
avg_availabilities
|> Enum.zip()
|> Enum.map(&Tuple.to_list/1)
|> Enum.map(fn avg_availabilities ->
Float.round(Enum.sum(avg_availabilities) / length(avg_availabilities), 3)
end)
end

@doc """
Determine when the aggregate is empty
"""
@spec empty?(t()) :: boolean()
def empty?(%__MODULE__{transaction_summaries: [], p2p_availabilities: p2p_availabilities})
when map_size(p2p_availabilities) == 0,
do: true

def empty?(%__MODULE__{}), do: false
end
Loading

0 comments on commit a453377

Please sign in to comment.