From 0f210f4e8145c60e4ce69dc22833ab70100a6325 Mon Sep 17 00:00:00 2001 From: Neylix Date: Mon, 6 Mar 2023 19:15:54 +0100 Subject: [PATCH] Verify attestation confirmations threshold if refused, attestation is postponed to next summary --- .../beacon_chain/replication_attestation.ex | 85 +++++++++++++++ .../beacon_chain/summary_aggregate.ex | 42 ++++++- lib/archethic_web/live/chains/beacon_live.ex | 1 + .../replication_attestation_test.exs | 103 +++++++++++++++++- .../beacon_chain/summary_aggregate_test.exs | 83 +++++++++++++- test/archethic/beacon_chain_test.exs | 29 ++--- .../archethic/bootstrap/network_init_test.exs | 2 + 7 files changed, 316 insertions(+), 29 deletions(-) diff --git a/lib/archethic/beacon_chain/replication_attestation.ex b/lib/archethic/beacon_chain/replication_attestation.ex index 5537865f24..1c2cba8492 100644 --- a/lib/archethic/beacon_chain/replication_attestation.ex +++ b/lib/archethic/beacon_chain/replication_attestation.ex @@ -5,7 +5,10 @@ defmodule Archethic.BeaconChain.ReplicationAttestation do alias Archethic.Crypto + alias Archethic.BeaconChain.SummaryTimer + alias Archethic.Election + alias Archethic.Election.HypergeometricDistribution alias Archethic.P2P alias Archethic.P2P.Message.GetTransactionSummary @@ -23,6 +26,11 @@ defmodule Archethic.BeaconChain.ReplicationAttestation do confirmations: list({position :: non_neg_integer(), signature :: binary()}) } + # Minimum 10 nodes to start verifying the threshold + @minimum_nodes_for_threshold 10 + # Minimum 35% of the expected confirmations must be present + @confirmations_threshold 0.35 + @doc """ Serialize a replication attestation @@ -240,6 +248,83 @@ defmodule Archethic.BeaconChain.ReplicationAttestation do ) end + @doc """ + Filter a list of replication attestation to keep only the one that reached the + minimum confirmations threshold + """ + @spec filter_reached_threshold(Enumerable.t(t()), DateTime.t()) :: map() + def filter_reached_threshold(attestations, summary_time) do + # For security reason we reject the attestation with less than 35% of expected confirmations + # It may be due to latency, so we forward the attestation for next summary + + # In a summary there is maybe attestations from last summary + previous_summary_time = summary_time |> SummaryTimer.previous_summary() + + # replicas_count = 0 when we allow any confirmations number + previous_replicas_count = + with nb_nodes when nb_nodes > 0 <- + P2P.authorized_and_available_nodes(previous_summary_time, true) |> length(), + expected_number <- HypergeometricDistribution.run_simulation(nb_nodes), + true <- expected_number > @minimum_nodes_for_threshold do + expected_number + else + _ -> 0 + end + + current_replicas_count = + with nb_nodes when nb_nodes > 0 <- + P2P.authorized_and_available_nodes(summary_time, true) |> length(), + expected_number <- HypergeometricDistribution.run_simulation(nb_nodes), + true <- expected_number > @minimum_nodes_for_threshold do + expected_number + else + _ -> 0 + end + + if previous_replicas_count + current_replicas_count == 0 do + # Nothing to filter + %{accepted: attestations, refused: []} + else + do_filter_reached_threshold( + attestations, + previous_replicas_count, + current_replicas_count, + previous_summary_time + ) + end + end + + defp do_filter_reached_threshold( + attestations, + previous_replicas_count, + current_replicas_count, + previous_summary_time + ) do + Enum.reduce( + attestations, + %{accepted: [], refused: []}, + fn attestation = %__MODULE__{ + transaction_summary: %TransactionSummary{timestamp: timestamp}, + confirmations: confirmations + }, + acc -> + replicas_count = + if DateTime.compare(timestamp, previous_summary_time) == :lt, + do: previous_replicas_count, + else: current_replicas_count + + if length(confirmations) >= replicas_count * @confirmations_threshold do + # Confirmations reached threshold we accept the attestation in the summary + Map.update!(acc, :accepted, &[attestation | &1]) + else + # Confirmations didn't reached threshold, we postpone attestation in next summary + Map.update!(acc, :refused, &[attestation | &1]) + end + end + ) + |> Map.update!(:accepted, &Enum.reverse/1) + end + defp check_transaction_summary(nodes, expected_summary, timeout \\ 500) defp check_transaction_summary([], _, _), do: {:error, :network_issue} diff --git a/lib/archethic/beacon_chain/summary_aggregate.ex b/lib/archethic/beacon_chain/summary_aggregate.ex index f8a23ae0f2..d976dd3086 100644 --- a/lib/archethic/beacon_chain/summary_aggregate.ex +++ b/lib/archethic/beacon_chain/summary_aggregate.ex @@ -20,8 +20,12 @@ defmodule Archethic.BeaconChain.SummaryAggregate do alias Archethic.TransactionChain.TransactionSummary + alias Archethic.PubSub + alias Archethic.Utils + require Logger + @availability_adding_time :archethic |> Application.compile_env!(Archethic.SelfRepair.Scheduler) |> Keyword.fetch!(:availability_application) @@ -62,9 +66,7 @@ defmodule Archethic.BeaconChain.SummaryAggregate do fn prev -> transaction_attestations |> Enum.filter(&(ReplicationAttestation.validate(&1, false) == :ok)) - |> Enum.map(& &1.transaction_summary) |> Enum.concat(prev) - |> Enum.uniq_by(& &1.address) end ) |> Map.update!(:availability_adding_time, &[availability_adding_time | &1]) @@ -106,11 +108,15 @@ defmodule Archethic.BeaconChain.SummaryAggregate do Aggregate summaries batch """ @spec aggregate(t()) :: t() - def aggregate(agg = %__MODULE__{}) do + def aggregate(agg = %__MODULE__{summary_time: summary_time}) do agg - |> Map.update!(:transaction_summaries, fn transactions -> - transactions - |> Enum.uniq_by(& &1.address) + |> Map.update!(:transaction_summaries, fn attestations -> + # Aggregate all confirmations, then filter the attestations that reached + # the threshold. Postpone to next summary the attestations that didn't reach the threshold + attestations + |> ReplicationAttestation.reduce_confirmations() + |> filter_reached_threshold(summary_time) + |> Enum.map(& &1.transaction_summary) |> Enum.sort_by(& &1.timestamp, {:asc, DateTime}) end) |> Map.update!(:availability_adding_time, fn @@ -133,6 +139,30 @@ defmodule Archethic.BeaconChain.SummaryAggregate do end) end + defp filter_reached_threshold(attestations, summary_time) do + %{accepted: accepted_attestations, refused: refused_attestations} = + ReplicationAttestation.filter_reached_threshold(attestations, summary_time) + + Enum.each( + refused_attestations, + fn attestation = %ReplicationAttestation{ + transaction_summary: %TransactionSummary{address: address, type: type}, + confirmations: confirmations + } -> + Logger.debug( + "Attestation postponed to next summary with #{length(confirmations)} confirmations", + transaction_address: Base.encode16(address), + transaction_type: type + ) + + # Notification will be catched by subset and add the attestation in current Slot + PubSub.notify_replication_attestation(attestation) + end + ) + + accepted_attestations + end + defp aggregate_node_availabilities(node_availabilities) do node_availabilities |> Enum.zip() diff --git a/lib/archethic_web/live/chains/beacon_live.ex b/lib/archethic_web/live/chains/beacon_live.ex index 10f8933238..ebb6f448fc 100644 --- a/lib/archethic_web/live/chains/beacon_live.ex +++ b/lib/archethic_web/live/chains/beacon_live.ex @@ -224,6 +224,7 @@ defmodule ArchethicWeb.BeaconChainLive do defp list_transactions_from_summaries(date = %DateTime{}) do %SummaryAggregate{transaction_summaries: tx_summaries} = Archethic.fetch_and_aggregate_summaries(date) + |> SummaryAggregate.aggregate() Enum.sort_by(tx_summaries, & &1.timestamp, {:desc, DateTime}) end diff --git a/test/archethic/beacon_chain/replication_attestation_test.exs b/test/archethic/beacon_chain/replication_attestation_test.exs index 382d393ccb..8865db89f3 100644 --- a/test/archethic/beacon_chain/replication_attestation_test.exs +++ b/test/archethic/beacon_chain/replication_attestation_test.exs @@ -1,8 +1,109 @@ defmodule Archethic.BeaconChain.ReplicationAttestationTest do - use ExUnit.Case + use ArchethicCase alias Archethic.BeaconChain.ReplicationAttestation + alias Archethic.BeaconChain.SummaryTimer + alias Archethic.TransactionChain.TransactionSummary + alias Archethic.P2P + alias Archethic.P2P.Node + doctest ReplicationAttestation + + describe "filter_unreached_threshold/2" do + setup do + # Summary timer each hour + start_supervised!({SummaryTimer, interval: "0 0 * * * *"}) + # Create 10 nodes on last summary + Enum.each(0..9, fn i -> + P2P.add_and_connect_node(%Node{ + ip: {88, 130, 19, i}, + port: 3000 + i, + last_public_key: :crypto.strong_rand_bytes(32), + first_public_key: :crypto.strong_rand_bytes(32), + geo_patch: "AAA", + available?: true, + authorized?: true, + authorization_date: DateTime.utc_now() |> DateTime.add(-1, :hour), + enrollment_date: DateTime.utc_now() |> DateTime.add(-2, :hour) + }) + end) + + # Add two node in the current summary + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 1}, + port: 3000, + last_public_key: :crypto.strong_rand_bytes(32), + first_public_key: :crypto.strong_rand_bytes(32), + geo_patch: "AAA", + available?: true, + authorized?: true, + authorization_date: DateTime.utc_now(), + enrollment_date: DateTime.utc_now() + }) + + # Add two node in the current summary + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 2}, + port: 3001, + last_public_key: :crypto.strong_rand_bytes(32), + first_public_key: :crypto.strong_rand_bytes(32), + geo_patch: "AAA", + available?: true, + authorized?: true, + authorization_date: DateTime.utc_now(), + enrollment_date: DateTime.utc_now() + }) + end + + test "should not remove attestation if minimum node is not reached" do + attestations = [ + # First Replication with enough threshold + %ReplicationAttestation{ + transaction_summary: %TransactionSummary{ + timestamp: DateTime.utc_now() |> DateTime.add(-1, :hour) + }, + confirmations: Enum.map(0..9, &{&1, "signature#{&1}"}) + }, + # Second Replication without enough threshold + %ReplicationAttestation{ + transaction_summary: %TransactionSummary{ + timestamp: DateTime.utc_now() |> DateTime.add(-1, :hour) + }, + confirmations: Enum.map(0..2, &{&1, "signature#{&1}"}) + } + ] + + summary_time = DateTime.utc_now() |> SummaryTimer.next_summary() + + assert %{accepted: ^attestations, refused: []} = + ReplicationAttestation.filter_reached_threshold(attestations, summary_time) + end + + test "should remove attestation if threshold is not reached" do + # First Replication with enough threshold + attestation1 = %ReplicationAttestation{ + transaction_summary: %TransactionSummary{ + timestamp: DateTime.utc_now() + }, + confirmations: Enum.map(0..9, &{&1, "signature#{&1}"}) + } + + # Second Replication without enough threshold + attestation2 = %ReplicationAttestation{ + transaction_summary: %TransactionSummary{ + timestamp: DateTime.utc_now() + }, + confirmations: Enum.map(0..2, &{&1, "signature#{&1}"}) + } + + attestations = [attestation1, attestation2] + + summary_time = DateTime.utc_now() |> SummaryTimer.next_summary() + + assert %{accepted: [^attestation1], refused: [^attestation2]} = + ReplicationAttestation.filter_reached_threshold(attestations, summary_time) + end + end end diff --git a/test/archethic/beacon_chain/summary_aggregate_test.exs b/test/archethic/beacon_chain/summary_aggregate_test.exs index 4764391b7b..dd313de6dc 100644 --- a/test/archethic/beacon_chain/summary_aggregate_test.exs +++ b/test/archethic/beacon_chain/summary_aggregate_test.exs @@ -1,8 +1,89 @@ defmodule Archethic.BeaconChain.SummaryAggregateTest do - use ExUnit.Case + use ArchethicCase + alias Archethic.BeaconChain.ReplicationAttestation alias Archethic.BeaconChain.SummaryAggregate + alias Archethic.BeaconChain.SummaryTimer alias Archethic.TransactionChain.TransactionSummary + alias Archethic.P2P + alias Archethic.P2P.Node + + alias Archethic.PubSub + doctest SummaryAggregate + + test "aggregate_slots should postpone attestation if refused" do + # Summary timer each hour + start_supervised!({SummaryTimer, interval: "0 0 * * * *"}) + # Create 10 nodes on last summary + Enum.each(0..9, fn i -> + P2P.add_and_connect_node(%Node{ + ip: {88, 130, 19, i}, + port: 3000 + i, + last_public_key: :crypto.strong_rand_bytes(32), + first_public_key: :crypto.strong_rand_bytes(32), + geo_patch: "AAA", + available?: true, + authorized?: true, + authorization_date: DateTime.utc_now() |> DateTime.add(-1, :hour), + enrollment_date: DateTime.utc_now() |> DateTime.add(-2, :hour) + }) + end) + + # Add two node in the current summary + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 1}, + port: 3000, + last_public_key: :crypto.strong_rand_bytes(32), + first_public_key: :crypto.strong_rand_bytes(32), + geo_patch: "AAA", + available?: true, + authorized?: true, + authorization_date: DateTime.utc_now(), + enrollment_date: DateTime.utc_now() + }) + + # Add two node in the current summary + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 2}, + port: 3001, + last_public_key: :crypto.strong_rand_bytes(32), + first_public_key: :crypto.strong_rand_bytes(32), + geo_patch: "AAA", + available?: true, + authorized?: true, + authorization_date: DateTime.utc_now(), + enrollment_date: DateTime.utc_now() + }) + + # First Replication with enough threshold + attestation1 = %ReplicationAttestation{ + transaction_summary: %TransactionSummary{ + address: :crypto.strong_rand_bytes(32), + timestamp: DateTime.utc_now() + }, + confirmations: Enum.map(0..9, &{&1, "signature#{&1}"}) + } + + # Second Replication without enough threshold + attestation2 = %ReplicationAttestation{ + transaction_summary: %TransactionSummary{ + address: :crypto.strong_rand_bytes(32), + timestamp: DateTime.utc_now() + }, + confirmations: Enum.map(0..2, &{&1, "signature#{&1}"}) + } + + attestations = [attestation1, attestation2] + + summary_time = DateTime.utc_now() |> SummaryTimer.next_summary() + + PubSub.register_to_new_replication_attestations() + + %SummaryAggregate{summary_time: summary_time, transaction_summaries: attestations} + |> SummaryAggregate.aggregate() + + assert_receive {:new_replication_attestation, ^attestation2} + end end diff --git a/test/archethic/beacon_chain_test.exs b/test/archethic/beacon_chain_test.exs index 9f1b4006d5..6d54efde2f 100644 --- a/test/archethic/beacon_chain_test.exs +++ b/test/archethic/beacon_chain_test.exs @@ -6,6 +6,7 @@ defmodule Archethic.BeaconChainTest do alias Archethic.BeaconChain.Slot alias Archethic.BeaconChain.Slot.EndOfNodeSync alias Archethic.BeaconChain.SlotTimer + alias Archethic.BeaconChain.SummaryTimer alias Archethic.BeaconChain.Subset alias Archethic.BeaconChain.Subset.SummaryCache alias Archethic.BeaconChain.SubsetRegistry @@ -33,6 +34,7 @@ defmodule Archethic.BeaconChainTest do setup do start_supervised!({SlotTimer, interval: "0 0 * * * *"}) + start_supervised!({SummaryTimer, interval: "0 0 * * * *"}) Enum.map(BeaconChain.list_subsets(), &start_supervised({Subset, subset: &1}, id: &1)) Enum.each(BeaconChain.list_subsets(), &Subset.start_link(subset: &1)) :ok @@ -67,7 +69,6 @@ defmodule Archethic.BeaconChainTest do describe "load_slot/1" do test "should fetch the transaction chain from the beacon involved nodes" do - SummaryTimer.start_link([interval: "0 0 * * * * *"], []) SummaryCache.start_link() File.mkdir_p!(Utils.mut_dir()) @@ -228,6 +229,7 @@ defmodule Archethic.BeaconChainTest do summary_time, P2P.authorized_and_available_nodes() ) + |> SummaryAggregate.aggregate() assert [addr1] == Enum.map(transaction_summaries, & &1.address) end @@ -328,6 +330,7 @@ defmodule Archethic.BeaconChainTest do summary_time, P2P.authorized_and_available_nodes() ) + |> SummaryAggregate.aggregate() transaction_addresses = Enum.map(transaction_summaries, & &1.address) @@ -418,17 +421,9 @@ defmodule Archethic.BeaconChainTest do summary_time, P2P.authorized_and_available_nodes() ) + |> SummaryAggregate.aggregate() - expected_availabilities = - [summary_v1, summary_v2, summary_v3, summary_v4] - |> Enum.map(& &1.node_availabilities) - |> Enum.flat_map(&Utils.bitstring_to_integer_list/1) - |> Enum.sort() - - assert node_availabilities - |> Enum.flat_map(& &1) - |> Enum.sort() == - expected_availabilities + assert <<1::1, 1::1, 1::1>> == node_availabilities end test "should find other beacon summaries and accumulate node P2P avg availabilities", %{ @@ -514,17 +509,9 @@ defmodule Archethic.BeaconChainTest do summary_time, P2P.authorized_and_available_nodes() ) + |> SummaryAggregate.aggregate() - expected_average_availabilities = - [summary_v1, summary_v2, summary_v3, summary_v4] - |> Enum.map(& &1.node_average_availabilities) - |> Enum.flat_map(& &1) - |> Enum.sort() - - assert node_average_availabilities - |> Enum.flat_map(& &1) - |> Enum.sort() == - expected_average_availabilities + assert [0.925, 0.8, 0.925, 0.85] == node_average_availabilities end end end diff --git a/test/archethic/bootstrap/network_init_test.exs b/test/archethic/bootstrap/network_init_test.exs index fdcb9ff3e2..ea272a467a 100644 --- a/test/archethic/bootstrap/network_init_test.exs +++ b/test/archethic/bootstrap/network_init_test.exs @@ -8,6 +8,7 @@ defmodule Archethic.Bootstrap.NetworkInitTest do alias Archethic.BeaconChain.ReplicationAttestation alias Archethic.BeaconChain.Slot, as: BeaconSlot alias Archethic.BeaconChain.SlotTimer, as: BeaconSlotTimer + alias Archethic.BeaconChain.SummaryTimer, as: BeaconSummaryTimer alias Archethic.BeaconChain.Subset, as: BeaconSubset alias Archethic.BeaconChain.SubsetRegistry, as: BeaconSubsetRegistry @@ -62,6 +63,7 @@ defmodule Archethic.Bootstrap.NetworkInitTest do setup do start_supervised!({BeaconSlotTimer, interval: "0 * * * * * *"}) + start_supervised!({BeaconSummaryTimer, interval: "0 0 * * * * *"}) Enum.each(BeaconChain.list_subsets(), &BeaconSubset.start_link(subset: &1)) start_supervised!({NodeRenewalScheduler, interval: "*/2 * * * * * *"})