Skip to content

Commit

Permalink
Store beacon aggregate when notifier is triggered
Browse files Browse the repository at this point in the history
  • Loading branch information
Neylix committed Dec 1, 2022
1 parent 78495d4 commit 4e5ec54
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 20 deletions.
18 changes: 14 additions & 4 deletions lib/archethic/p2p.ex
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,10 @@ defmodule Archethic.P2P do
case nodes do
[] ->
# Only happen during bootstrap
get_first_enrolled_node()
case get_first_enrolled_node() do
nil -> []
node -> [node]
end

nodes ->
nodes
Expand Down Expand Up @@ -211,18 +214,25 @@ defmodule Archethic.P2P do
case nodes do
[] ->
# Only happen for init transactions so we take the first enrolled node
get_first_enrolled_node()
case get_first_enrolled_node() do
nil -> []
node -> [node]
end

nodes ->
nodes
end
end

defp get_first_enrolled_node() do
@doc """
Return the first enrolled node
"""
@spec get_first_enrolled_node() :: Node.t() | nil
def get_first_enrolled_node() do
list_nodes()
|> Enum.reject(&(&1.enrollment_date == nil))
|> Enum.sort_by(& &1.enrollment_date, {:asc, DateTime})
|> Enum.take(1)
|> List.first()
end

@doc """
Expand Down
10 changes: 8 additions & 2 deletions lib/archethic/p2p/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1873,10 +1873,16 @@ defmodule Archethic.P2P.Message do
) do
# Ensure all addresses are expected to be replicated
nodes = P2P.authorized_and_available_nodes()
addresses = [storage_address | io_addresses]

addresses =
if storage_address != nil, do: [storage_address | io_addresses], else: io_addresses

public_key = Crypto.first_node_public_key()

if Enum.all?(addresses, &(Election.storage_nodes(&1, nodes) |> Enum.member?(public_key))) do
if Enum.all?(
addresses,
&(Election.storage_nodes(&1, nodes) |> Utils.key_in_node_list?(public_key))
) do
case SelfRepair.repair_in_progress?(first_address) do
false ->
SelfRepair.start_worker(
Expand Down
72 changes: 66 additions & 6 deletions lib/archethic/self_repair/notifier.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ defmodule Archethic.SelfRepair.Notifier do
"""

alias Archethic.{
BeaconChain,
Crypto,
Election,
P2P,
P2P.Node,
P2P.Message.ShardRepair,
TransactionChain,
TransactionChain.Transaction
TransactionChain.Transaction,
Utils
}

alias Archethic.TransactionChain.Transaction.{
Expand Down Expand Up @@ -59,7 +62,12 @@ defmodule Archethic.SelfRepair.Notifier do
prev_available_nodes = Keyword.fetch!(data, :prev_available_nodes)
new_available_nodes = Keyword.fetch!(data, :new_available_nodes)

Logger.info(
"Start Notifier due to a topology change #{inspect(Enum.map(unavailable_nodes, &Base.encode16(&1)))}"
)

repair_transactions(unavailable_nodes, prev_available_nodes, new_available_nodes)
repair_summaries_aggregate(prev_available_nodes, new_available_nodes)

{:stop, :normal, data}
end
Expand All @@ -70,10 +78,6 @@ defmodule Archethic.SelfRepair.Notifier do
"""
@spec repair_transactions(list(Crypto.key()), list(Node.t()), list(Node.t())) :: :ok
def repair_transactions(unavailable_nodes, prev_available_nodes, new_available_nodes) do
Logger.debug(
"Trying to repair transactions due to a topology change #{inspect(Enum.map(unavailable_nodes, &Base.encode16(&1)))}"
)

# We fetch all the transactions existing and check if the disconnected nodes were in storage nodes
TransactionChain.stream_first_addresses()
|> Stream.reject(&network_chain?(&1))
Expand Down Expand Up @@ -231,7 +235,8 @@ defmodule Archethic.SelfRepair.Notifier do
acc,
fn {node_first_public_key, %{last_address: last_address, io_addresses: io_addresses}} ->
Logger.info(
"Send Shard Repair message to #{Base.encode16(node_first_public_key)} with storage_address #{Base.encode16(last_address)}, " <>
"Send Shard Repair message to #{Base.encode16(node_first_public_key)}" <>
"with storage_address #{if last_address, do: Base.encode16(last_address), else: nil}, " <>
"io_addresses #{inspect(Enum.map(io_addresses, &Base.encode16(&1)))}",
address: Base.encode16(first_address)
)
Expand All @@ -247,4 +252,59 @@ defmodule Archethic.SelfRepair.Notifier do
)
|> Stream.run()
end

@doc """
For each beacon aggregate, calculate the new election and store it if the node needs to
"""
@spec repair_summaries_aggregate(list(Node.t()), list(Node.t())) :: :ok
def repair_summaries_aggregate(prev_available_nodes, new_available_nodes) do
%Node{enrollment_date: first_enrollment_date} = P2P.get_first_enrolled_node()

first_enrollment_date
|> BeaconChain.next_summary_dates()
|> Stream.filter(&download?(&1, new_available_nodes))
|> Stream.chunk_every(20)
|> Stream.each(fn summary_times ->
Task.Supervisor.async_stream_nolink(
Archethic.TaskSupervisor,
summary_times,
&download_and_store_summary(&1, prev_available_nodes),
ordered: false,
on_timeout: :kill_task
)
|> Stream.run()
end)
|> Stream.run()
end

defp download?(summary_time, new_available_nodes) do
in_new_election? =
summary_time
|> Crypto.derive_beacon_aggregate_address()
|> Election.chain_storage_nodes(new_available_nodes)
|> Utils.key_in_node_list?(Crypto.first_node_public_key())

if in_new_election? do
case BeaconChain.get_summaries_aggregate(summary_time) do
{:ok, _} -> false
{:error, _} -> true
end
else
false
end
end

defp download_and_store_summary(summary_time, prev_available_nodes) do
case BeaconChain.fetch_summaries_aggregate(summary_time, prev_available_nodes) do
{:ok, aggregate} ->
Logger.debug("Notifier store beacon aggregate for #{summary_time}")
BeaconChain.write_summaries_aggregate(aggregate)

error ->
Logger.warning(
"Notifier cannot fetch summary aggregate for date #{summary_time} " <>
"because of #{inspect(error)}"
)
end
end
end
2 changes: 1 addition & 1 deletion lib/archethic/self_repair/repair_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ defmodule Archethic.SelfRepair.RepairWorker do
Registry.register(RepairRegistry, first_address, [])

Logger.info(
"Notifier Repair Worker start with storage_address #{Base.encode16(storage_address)}, " <>
"Notifier Repair Worker start with storage_address #{if storage_address, do: Base.encode16(storage_address), else: nil}, " <>
"io_addresses #{inspect(Enum.map(io_addresses, &Base.encode16(&1)))}",
address: Base.encode16(first_address)
)
Expand Down
14 changes: 8 additions & 6 deletions lib/archethic/self_repair/sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,9 @@ defmodule Archethic.SelfRepair.Sync do
|> Enum.map(&update_availabilities(&1, availability_update))
|> DB.register_p2p_summary()

if :persistent_term.get(:archethic_up, nil) == :up do
new_available_nodes = P2P.authorized_and_available_nodes(availability_update)
new_available_nodes = P2P.authorized_and_available_nodes(availability_update)

if :persistent_term.get(:archethic_up, nil) == :up do
SelfRepair.start_notifier(
previous_available_nodes,
new_available_nodes,
Expand All @@ -280,7 +280,7 @@ defmodule Archethic.SelfRepair.Sync do

update_statistics(summary_time, transaction_summaries)

store_aggregate(aggregate)
store_aggregate(aggregate, new_available_nodes)
end

defp synchronize_transactions([], _), do: :ok
Expand Down Expand Up @@ -387,9 +387,11 @@ defmodule Archethic.SelfRepair.Sync do
PubSub.notify_new_tps(tps, nb_transactions)
end

defp store_aggregate(aggregate = %SummaryAggregate{summary_time: summary_time}) do
node_list =
[P2P.get_node_info() | P2P.authorized_and_available_nodes()] |> P2P.distinct_nodes()
defp store_aggregate(
aggregate = %SummaryAggregate{summary_time: summary_time},
new_available_nodes
) do
node_list = [P2P.get_node_info() | new_available_nodes] |> P2P.distinct_nodes()

should_store? =
summary_time
Expand Down
77 changes: 76 additions & 1 deletion test/archethic/self_repair/notifier_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ defmodule Archethic.SelfRepair.NotifierTest do

import Mox

alias Archethic.BeaconChain.SummaryAggregate
alias Archethic.BeaconChain.SummaryTimer

alias Archethic.Crypto

alias Archethic.Election

alias Archethic.P2P
alias Archethic.P2P.Message.GetBeaconSummariesAggregate
alias Archethic.P2P.Message.Ok
alias Archethic.P2P.Message.ShardRepair
alias Archethic.P2P.Node
Expand Down Expand Up @@ -92,7 +96,7 @@ defmodule Archethic.SelfRepair.NotifierTest do
assert ^expected = Notifier.map_last_addresses_for_node(tab)
end

test "repair_transactions/1 should send message to new storage nodes" do
test "repair_transactions/3 should send message to new storage nodes" do
node = %Node{
first_public_key: Crypto.first_node_public_key(),
last_public_key: Crypto.last_node_public_key(),
Expand Down Expand Up @@ -181,4 +185,75 @@ defmodule Archethic.SelfRepair.NotifierTest do
assert_receive :new_node
refute_receive :new_node, 200
end

test "repair_summaries_aggregate/2 should store beacon aggregate" do
enrollment_date = DateTime.utc_now() |> DateTime.add(-10, :minute)

node = %Node{
first_public_key: Crypto.first_node_public_key(),
last_public_key: Crypto.last_node_public_key(),
geo_patch: "AAA",
network_patch: "AAA",
authorized?: true,
authorization_date: DateTime.utc_now() |> DateTime.add(-11, :minute),
available?: true,
enrollment_date: enrollment_date,
availability_history: <<1::1>>
}

P2P.add_and_connect_node(node)

nodes =
Enum.map(1..9, fn nb ->
%Node{
first_public_key: "node#{nb}",
last_public_key: "node#{nb}",
geo_patch: "AAA",
network_patch: "AAA",
authorized?: true,
authorization_date: DateTime.utc_now(),
available?: true,
availability_history: <<1::1>>
}
end)

nodes = [node | nodes]

start_supervised!({SummaryTimer, interval: "0 * * * *"})

[first_date | rest] = SummaryTimer.next_summaries(enrollment_date) |> Enum.to_list()
random_date = Enum.random(rest)

me = self()

MockDB
|> stub(:get_beacon_summaries_aggregate, fn
summary_time when summary_time in [first_date, random_date] ->
{:error, :not_exists}

summary_time ->
{:ok, %SummaryAggregate{summary_time: summary_time}}
end)
|> expect(:write_beacon_summaries_aggregate, 2, fn
%SummaryAggregate{summary_time: summary_time} when summary_time == first_date ->
send(me, :write_first_date)

%SummaryAggregate{summary_time: summary_time} when summary_time == random_date ->
send(me, :write_random_date)

_ ->
send(me, :unexpected)
end)

MockClient
|> stub(:send_message, fn _, %GetBeaconSummariesAggregate{date: summary_time}, _ ->
{:ok, %SummaryAggregate{summary_time: summary_time}}
end)

Notifier.repair_summaries_aggregate(nodes, nodes)

assert_receive :write_first_date
assert_receive :write_random_date
refute_receive :unexpected
end
end

0 comments on commit 4e5ec54

Please sign in to comment.