Skip to content

Commit

Permalink
Fix beacon summary aggregation in self repair (#294)
Browse files Browse the repository at this point in the history
* Improve next summary date for Beacon summary time
* Fix ordering of transactions in self repair
* Fix clean db task output with multiple local nodes
  • Loading branch information
Samuel authored and Samuel committed May 6, 2022
1 parent c0aa6f5 commit 172539d
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 79 deletions.
20 changes: 14 additions & 6 deletions lib/archethic/beacon_chain/summary_timer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,22 @@ defmodule ArchEthic.BeaconChain.SummaryTimer do
end

@doc """
Give the next beacon chain slot using the `SlotTimer` interval
Give the next beacon chain slot using the `SummaryTimer` interval
"""

def next_summary(date_from = %DateTime{}) do
get_interval()
|> CronParser.parse!(true)
|> CronScheduler.get_next_run_date!(DateTime.to_naive(date_from))
|> DateTime.from_naive!("Etc/UTC")
cron_expression = CronParser.parse!(get_interval(), true)
naive_date_from = DateTime.to_naive(date_from)

if Crontab.DateChecker.matches_date?(cron_expression, naive_date_from) do
cron_expression
|> CronScheduler.get_next_run_dates(naive_date_from)
|> Enum.at(1)
|> DateTime.from_naive!("Etc/UTC")
else
cron_expression
|> CronScheduler.get_next_run_date!(naive_date_from)
|> DateTime.from_naive!("Etc/UTC")
end
end

@doc """
Expand Down
54 changes: 27 additions & 27 deletions lib/archethic/self_repair/sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,23 @@ defmodule ArchEthic.SelfRepair.Sync do
|> BeaconChain.next_summary_dates()
|> Flow.from_enumerable()
|> Flow.flat_map(&subsets_by_times/1)
|> Flow.partition(key: {:elem, 0})
|> Flow.reduce(
fn -> %BeaconSummaryAggregate{} end,
&aggregate_summaries_by_date(&1, &2, authorized_nodes)
)
|> Flow.emit(:state)
|> Flow.partition(key: {:elem, 0}, window: flow_window())
|> Flow.reduce(fn -> [] end, fn {time, subset}, acc ->
summary = get_beacon_summary(time, subset, authorized_nodes)

if BeaconSummary.empty?(summary) do
acc
else
[summary | acc]
end
end)
|> Flow.on_trigger(fn acc, _, {:fixed, time, :done} ->
agg = %BeaconSummaryAggregate{
summary_time: DateTime.from_unix!(time, :millisecond)
}

{[Enum.reduce(acc, agg, &BeaconSummaryAggregate.add_summary(&2, &1))], acc}
end)
|> Stream.reject(&BeaconSummaryAggregate.empty?/1)
|> Enum.sort_by(& &1.summary_time)
|> Enum.each(&BeaconSummaryHandler.process_summary_aggregate(&1, patch))
Expand All @@ -123,33 +134,22 @@ defmodule ArchEthic.SelfRepair.Sync do
Enum.map(subsets, fn subset -> {DateTime.truncate(time, :second), subset} end)
end

# defp flow_window do
# Flow.Window.fixed(, :second, fn {date, _} ->
# DateTime.to_unix(date, :millisecond)
# end)
# end
defp flow_window do
Flow.Window.fixed(summary_interval(:second), :second, fn {date, _} ->
DateTime.to_unix(date, :millisecond)
end)
end

# defp dates_interval_seconds(last_sync_date) do
# DateTime.diff(last_sync_date, BeaconChain.next_summary_date(last_sync_date))
# end
defp summary_interval(unit) do
next_summary = BeaconChain.next_summary_date(DateTime.utc_now())
next_summary2 = BeaconChain.next_summary_date(next_summary)
DateTime.diff(next_summary2, next_summary, unit)
end

defp get_beacon_summary(time, subset, node_list) do
filter_nodes = Enum.filter(node_list, &(DateTime.compare(&1.authorization_date, time) == :lt))

nodes = Election.beacon_storage_nodes(subset, time, filter_nodes)
BeaconSummaryHandler.get_full_beacon_summary(time, subset, nodes)
end

defp aggregate_summaries_by_date({time, subset}, acc, authorized_nodes) do
summary = get_beacon_summary(time, subset, authorized_nodes)

if BeaconSummary.empty?(summary) do
acc
else
acc
|> BeaconSummaryAggregate.initialize(summary)
|> BeaconSummaryAggregate.add_transaction_summaries(summary)
|> BeaconSummaryAggregate.add_p2p_availabilities(summary)
end
end
end
55 changes: 22 additions & 33 deletions lib/archethic/self_repair/sync/beacon_aggregate.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ defmodule ArchEthic.SelfRepair.Sync.BeaconSummaryAggregate do

defstruct [:summary_time, transaction_summaries: [], p2p_availabilities: %{}]

alias ArchEthic.BeaconChain.ReplicationAttestation
alias ArchEthic.BeaconChain.Summary, as: BeaconSummary
alias ArchEthic.TransactionChain.TransactionSummary

Expand All @@ -23,42 +22,32 @@ defmodule ArchEthic.SelfRepair.Sync.BeaconSummaryAggregate do
}

@doc """
Initialize the aggregate by defining the summary time for example
If the aggregate is already initialized, the call will be skipped
Aggregate a new BeaconChain's summary
"""
@spec initialize(t(), BeaconSummary.t()) :: t()
def initialize(
aggregate = %__MODULE__{summary_time: nil},
%BeaconSummary{summary_time: summary_time}
@spec add_summary(t(), BeaconSummary.t()) :: t()
def add_summary(
agg = %__MODULE__{},
%BeaconSummary{
subset: subset,
transaction_attestations: attestations,
node_availabilities: node_availabilities,
node_average_availabilities: node_average_availabilities
}
) do
%{aggregate | summary_time: summary_time}
end
transaction_summaries =
attestations
|> Enum.map(& &1.transaction_summary)
|> Enum.concat(agg.transaction_summaries)
|> Enum.uniq_by(& &1.address)
|> Enum.sort_by(& &1.timestamp, {:asc, DateTime})

def initialize(
aggregate = %__MODULE__{},
_summary
) do
aggregate
end
p2p_availabilities =
Map.put(agg.p2p_availabilities, subset, %{
node_availabilities: node_availabilities,
node_average_availabilities: node_average_availabilities
})

@doc """
Add a transaction summaries to the aggregate by providing uniqueness and sorting
"""
@spec add_transaction_summaries(t(), BeaconSummary.t()) :: t()
def add_transaction_summaries(
aggregate = %__MODULE__{},
%BeaconSummary{transaction_attestations: transaction_attestations}
) do
transaction_attestations
|> Enum.reduce(aggregate, fn %ReplicationAttestation{transaction_summary: transaction_summary},
acc ->
Map.update!(acc, :transaction_summaries, fn transaction_summaries ->
[transaction_summary | transaction_summaries]
|> Enum.uniq_by(& &1.address)
|> Enum.sort_by(& &1.timestamp, {:asc, DateTime})
end)
end)
%{agg | transaction_summaries: transaction_summaries, p2p_availabilities: p2p_availabilities}
end

@doc """
Expand Down
1 change: 1 addition & 0 deletions lib/archethic/self_repair/sync/beacon_summary_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ defmodule ArchEthic.SelfRepair.Sync.BeaconSummaryHandler do
)
|> Enum.filter(&match?({:ok, {:ok, %BeaconSummary{}}}, &1))
|> Enum.map(fn {:ok, {:ok, summary}} -> summary end)
|> Enum.reject(&BeaconSummary.empty?/1)
|> Enum.reduce(
%{
transaction_attestations: [],
Expand Down
14 changes: 6 additions & 8 deletions lib/mix/tasks/clean_db.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@ defmodule Mix.Tasks.ArchEthic.CleanDb do
use Mix.Task

def run(_arg) do
files_to_remove =
["_build", "dev", "lib", "archethic", "data_*"]
|> Path.join()
|> Path.wildcard()

IO.puts("#{files_to_remove} will be removed")

Enum.each(files_to_remove, &File.rm_rf!/1)
"_build/dev/lib/archethic/data*"
|> Path.wildcard()
|> Enum.each(fn path ->
IO.puts("#{path} will be removed")
File.rm_rf!(path)
end)

IO.puts("Database dropped")
end
Expand Down
19 changes: 14 additions & 5 deletions test/archethic/beacon_chain/summary_timer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,20 @@ defmodule ArchEthic.BeaconChain.SummaryTimerTest do

alias ArchEthic.BeaconChain.SummaryTimer

test "next_summary/2 should get the next summary time from a given date" do
{:ok, _pid} = SummaryTimer.start_link([interval: "0 * * * * * *"], [])
now = ~U[2021-01-02 03:00:19Z]
next_summary_time = SummaryTimer.next_summary(now)
assert 1 == abs(now.minute - next_summary_time.minute)
describe "next_summary/2" do
test "should get the next summary time from a given date" do
{:ok, _pid} = SummaryTimer.start_link([interval: "0 * * * * * *"], [])
now = ~U[2021-01-02 03:00:19.501Z]
next_summary_time = SummaryTimer.next_summary(now)
assert 1 == abs(now.minute - next_summary_time.minute)
end

test "should get the 2nd next summary time when the date is an summary interval date" do
{:ok, _pid} = SummaryTimer.start_link([interval: "0 * * * * * *"], [])
next_date = SummaryTimer.next_summary(DateTime.utc_now())
next_summary_time = SummaryTimer.next_summary(next_date)
assert DateTime.compare(next_summary_time, next_date) == :gt
end
end

property "previous_summaries/1 should retrieve the previous summary times from a date" do
Expand Down

0 comments on commit 172539d

Please sign in to comment.