Skip to content

Commit

Permalink
Reduce time to find the elected summary nodes (#117)
Browse files Browse the repository at this point in the history
- Flatten the summary times and subset
- Using Flow to partition and split computation by time window to
  improve latency
  • Loading branch information
Samuel committed Oct 14, 2021
1 parent 457b586 commit efb057a
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 125 deletions.
71 changes: 41 additions & 30 deletions lib/archethic/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,9 @@ defmodule ArchEthic.BeaconChain do
alias ArchEthic.TransactionChain.Transaction
alias ArchEthic.TransactionChain.TransactionData

require Logger
alias ArchEthic.Utils

@type pools ::
list({
subset :: binary(),
nodes_by_date :: list({DateTime.t(), list(Node.t())})
})
require Logger

@doc """
Initialize the beacon subsets (from 0 to 255 for a byte capacity)
Expand All @@ -65,24 +61,31 @@ defmodule ArchEthic.BeaconChain do
@doc """
Retrieve the beacon summaries storage nodes from a last synchronization date
"""
@spec get_summary_pools(DateTime.t()) :: pools()
@spec get_summary_pools(list(DateTime.t()), list(Node.t())) :: Enumerable.t()
def get_summary_pools(
last_sync_date = %DateTime{},
summary_times,
node_list \\ P2P.authorized_nodes()
) do
summary_times = SummaryTimer.previous_summaries(last_sync_date)

Enum.reduce(list_subsets(), [], fn subset, acc ->
nodes_by_summary_time =
Enum.map(summary_times, fn time ->
filter_nodes =
Enum.filter(node_list, &(DateTime.compare(&1.authorization_date, time) == :lt))
subsets = list_subsets()

{time, Election.beacon_storage_nodes(subset, time, filter_nodes)}
end)
window = Utils.flow_window_from_dates(summary_times, &elem(&1, 0))

[{subset, nodes_by_summary_time} | acc]
summary_times
|> Flow.from_enumerable()
|> Flow.flat_map(fn time ->
Enum.map(subsets, fn subset -> {DateTime.truncate(time, :second), subset} end)
end)
|> Flow.partition(window: window, key: {:elem, 1})
|> Flow.reduce(fn -> [] end, fn {time, subset}, acc ->
[{time, subset, get_beacon_chain_pool(time, subset, node_list)} | acc]
end)
|> Flow.emit(:state)
|> Stream.flat_map(& &1)
end

defp get_beacon_chain_pool(time, subset, node_list) do
filter_nodes = Enum.filter(node_list, &(DateTime.compare(&1.authorization_date, time) == :lt))
Election.beacon_storage_nodes(subset, time, filter_nodes)
end

@doc """
Expand All @@ -100,21 +103,23 @@ defmodule ArchEthic.BeaconChain do
@doc """
Retrieve the beacon slots storage nodes from a last synchronization date
"""
@spec get_slot_pools(DateTime.t(), list(Node.t())) :: pools()
def get_slot_pools(date = %DateTime{}, node_list \\ P2P.authorized_nodes()) do
slot_times = SlotTimer.previous_slots(date)
@spec get_slot_pools(list(DateTime.t()), list(Node.t())) :: Enumerable.t()
def get_slot_pools(slot_times, node_list \\ P2P.authorized_nodes()) do
subsets = list_subsets()

Enum.reduce(list_subsets(), [], fn subset, acc ->
nodes_by_slot_time =
Enum.map(slot_times, fn time ->
filter_nodes =
Enum.filter(node_list, &(DateTime.compare(&1.authorization_date, time) == :lt))
window = Utils.flow_window_from_dates(slot_times, &elem(&1, 0))

{time, Election.beacon_storage_nodes(subset, time, filter_nodes)}
end)

[{subset, nodes_by_slot_time} | acc]
slot_times
|> Flow.from_enumerable()
|> Flow.flat_map(fn time ->
Enum.map(subsets, fn subset -> {DateTime.truncate(time, :second), subset} end)
end)
|> Flow.partition(window: window, key: {:elem, 1})
|> Flow.reduce(fn -> [] end, fn {time, subset}, acc ->
[{time, subset, get_beacon_chain_pool(time, subset, node_list)} | acc]
end)
|> Flow.emit(:state)
|> Stream.flat_map(& &1)
end

@doc """
Expand Down Expand Up @@ -257,4 +262,10 @@ defmodule ArchEthic.BeaconChain do
{:error, :not_found}
end
end

@doc """
Return the previous summary datetimes from a given date
"""
@spec previous_summary_dates(DateTime.t()) :: Enumerable.t()
defdelegate previous_summary_dates(date), to: SummaryTimer, as: :previous_summaries
end
1 change: 1 addition & 0 deletions lib/archethic/self_repair/sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ defmodule ArchEthic.SelfRepair.Sync do

defp missed_previous_summaries(last_sync_date, patch) do
last_sync_date
|> BeaconChain.previous_summary_dates()
|> BeaconChain.get_summary_pools()
|> BeaconSummaryHandler.get_beacon_summaries(patch)
end
Expand Down
149 changes: 84 additions & 65 deletions lib/archethic/self_repair/sync/beacon_summary_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,38 @@ defmodule ArchEthic.SelfRepair.Sync.BeaconSummaryHandler do
It request every subsets to find out the missing ones by querying beacon pool nodes.
"""
@spec get_beacon_summaries(BeaconChain.pools(), binary()) :: Enumerable.t()
@spec get_beacon_summaries(Enumerable.t(), binary()) :: Enumerable.t()
def get_beacon_summaries(summary_pools, patch) when is_binary(patch) do
Enum.map(summary_pools, fn {subset, nodes_by_summary_time} ->
Enum.map(nodes_by_summary_time, fn {summary_time, nodes} ->
{nodes, subset, summary_time}
end)
end)
|> :lists.flatten()
|> Task.async_stream(
fn {nodes, subset, summary_time} ->
beacon_address = Crypto.derive_beacon_chain_address(subset, summary_time, true)

beacon_address
|> download_summary(nodes, patch)
|> handle_summary_transaction(nodes, beacon_address)
end,
on_timeout: :kill_task,
max_concurrency: 256
window =
summary_pools
|> Stream.map(&elem(&1, 0))
|> Utils.flow_window_from_dates(&elem(&1, 0))

summary_pools
|> Flow.from_enumerable()
|> Flow.partition(
window: window,
key: {:elem, 1}
)
|> Enum.to_list()
|> Stream.filter(&match?({:ok, {:ok, %BeaconSummary{}}}, &1))
|> Stream.map(fn {:ok, {:ok, summary}} -> summary end)
|> Flow.map(fn {summary_time, subset, nodes} ->
beacon_address = Crypto.derive_beacon_chain_address(subset, summary_time, true)

beacon_address
|> download_summary(nodes, patch)
|> handle_summary_transaction(nodes, beacon_address)
end)
|> Flow.reduce(fn -> [] end, fn
{:ok, summary = %BeaconSummary{}}, acc -> [summary | acc]
_, acc -> acc
end)
|> Flow.emit(:state)
|> Stream.transform(fn -> [] end, fn
[], acc ->
{[], acc}

summaries, acc ->
{summaries, acc}
end)
end

def download_summary(beacon_address, nodes, patch, prev_result \\ nil)
Expand Down Expand Up @@ -162,48 +172,22 @@ defmodule ArchEthic.SelfRepair.Sync.BeaconSummaryHandler do
"""
@spec handle_missing_summaries(Enumerable.t() | list(BeaconSummary.t()), binary()) :: :ok
def handle_missing_summaries(summaries, node_patch) when is_binary(node_patch) do
%{
transactions: transactions,
ends_of_sync: ends_of_sync,
stats: stats,
p2p_availabilities: p2p_availabilities
} = reduce_summaries(summaries)

synchronize_transactions(transactions, node_patch)

Enum.each(ends_of_sync, fn %EndOfNodeSync{
public_key: node_public_key,
timestamp: timestamp
} ->
DB.register_p2p_summary(node_public_key, timestamp, true, 1.0)
P2P.set_node_globally_available(node_public_key)
end)

Enum.each(p2p_availabilities, fn
{%Node{first_public_key: node_key}, available?, avg_availability} ->
DB.register_p2p_summary(node_key, DateTime.utc_now(), available?, avg_availability)

if available? do
P2P.set_node_globally_available(node_key)
else
P2P.set_node_globally_unavailable(node_key)
end

P2P.set_node_average_availability(node_key, avg_availability)
end)

update_statistics(stats)
end

defp reduce_summaries(summaries) do
Enum.reduce(
summaries,
%{transactions: [], ends_of_sync: [], stats: %{}, p2p_availabilities: []},
&do_reduce_summary/2
)
|> Map.update!(:transactions, &List.flatten/1)
|> Map.update!(:ends_of_sync, &List.flatten/1)
|> Map.update!(:p2p_availabilities, &List.flatten/1)
initial_state = %{transactions: [], ends_of_sync: [], stats: %{}, p2p_availabilities: []}

window =
summaries
|> Stream.uniq_by(& &1.subset)
|> Stream.map(& &1.summary_time)
|> Utils.flow_window_from_dates(& &1.summary_time)

summaries
|> Flow.from_enumerable()
|> Flow.partition(window: window, key: {:key, :subset})
|> Flow.reduce(fn -> initial_state end, &do_reduce_summary/2)
|> Flow.emit(:state)
|> Flow.partition()
|> Flow.map(&process_summary_aggregate(&1, node_patch))
|> Flow.run()
end

defp do_reduce_summary(
Expand All @@ -215,12 +199,30 @@ defmodule ArchEthic.SelfRepair.Sync.BeaconSummaryHandler do
acc
) do
acc
|> Map.update!(:transactions, &[transaction_summaries | &1])
|> Map.update!(:ends_of_sync, &[ends_of_sync | &1])
|> Map.update!(:p2p_availabilities, &[BeaconSummary.get_node_availabilities(summary) | &1])
|> Map.update!(:transactions, &(&1 ++ transaction_summaries))
|> Map.update!(:ends_of_sync, &(&1 ++ ends_of_sync))
|> Map.update!(:p2p_availabilities, &(&1 ++ BeaconSummary.get_node_availabilities(summary)))
|> update_in([:stats, Access.key(summary_time, 0)], &(&1 + length(transaction_summaries)))
end

defp process_summary_aggregate(
%{
transactions: transactions,
ends_of_sync: ends_of_sync,
stats: stats,
p2p_availabilities: p2p_availabilities
},
node_patch
) do
synchronize_transactions(transactions, node_patch)

Enum.each(ends_of_sync, &handle_end_of_node_sync/1)

Enum.each(p2p_availabilities, &update_availabilities(elem(&1, 0), elem(&1, 1), elem(&1, 2)))

update_statistics(stats)
end

defp synchronize_transactions(transaction_summaries, node_patch) do
transactions_to_sync =
transaction_summaries
Expand All @@ -234,6 +236,23 @@ defmodule ArchEthic.SelfRepair.Sync.BeaconSummaryHandler do
Enum.each(transactions_to_sync, &TransactionHandler.download_transaction(&1, node_patch))
end

defp handle_end_of_node_sync(%EndOfNodeSync{public_key: node_public_key, timestamp: timestamp}) do
DB.register_p2p_summary(node_public_key, timestamp, true, 1.0)
P2P.set_node_globally_available(node_public_key)
end

defp update_availabilities(%Node{first_public_key: node_key}, available?, avg_availability) do
DB.register_p2p_summary(node_key, DateTime.utc_now(), available?, avg_availability)

if available? do
P2P.set_node_globally_available(node_key)
else
P2P.set_node_globally_unavailable(node_key)
end

P2P.set_node_average_availability(node_key, avg_availability)
end

defp update_statistics(stats) do
Enum.each(stats, fn {date, nb_transactions} ->
previous_summary_time =
Expand Down
20 changes: 20 additions & 0 deletions lib/archethic/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -544,4 +544,24 @@ defmodule ArchEthic.Utils do
div(milliseconds, 1000)
end
end

@doc """
Return a Flow.Window.fixed/3 from a list of dates by computing the interval between
"""
@spec flow_window_from_dates(Enumerable.t(), (any() -> integer())) :: Flow.Window.t()
def flow_window_from_dates(dates, mapper) when is_function(mapper) do
case Enum.take(dates, 2) do
[date1, date2] ->
diff = DateTime.diff(date2, date1) |> abs()

if diff > 0 do
Flow.Window.fixed(diff, :second, &DateTime.to_unix(mapper.(&1), :millisecond))
else
Flow.Window.global()
end

_ ->
Flow.Window.global()
end
end
end
Loading

0 comments on commit efb057a

Please sign in to comment.