Skip to content

Commit

Permalink
Verify attestation confirmations threshold
Browse files Browse the repository at this point in the history
if refused, attestation is postponed to next summary
  • Loading branch information
Neylix committed Mar 6, 2023
1 parent f7190cb commit 0f210f4
Show file tree
Hide file tree
Showing 7 changed files with 316 additions and 29 deletions.
85 changes: 85 additions & 0 deletions lib/archethic/beacon_chain/replication_attestation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ defmodule Archethic.BeaconChain.ReplicationAttestation do

alias Archethic.Crypto

alias Archethic.BeaconChain.SummaryTimer

alias Archethic.Election
alias Archethic.Election.HypergeometricDistribution

alias Archethic.P2P
alias Archethic.P2P.Message.GetTransactionSummary
Expand All @@ -23,6 +26,11 @@ defmodule Archethic.BeaconChain.ReplicationAttestation do
confirmations: list({position :: non_neg_integer(), signature :: binary()})
}

# Minimum 10 nodes to start verifying the threshold
@minimum_nodes_for_threshold 10
# Minimum 35% of the expected confirmations must be present
@confirmations_threshold 0.35

@doc """
Serialize a replication attestation
Expand Down Expand Up @@ -240,6 +248,83 @@ defmodule Archethic.BeaconChain.ReplicationAttestation do
)
end

@doc """
Filter a list of replication attestation to keep only the one that reached the
minimum confirmations threshold
"""
@spec filter_reached_threshold(Enumerable.t(t()), DateTime.t()) :: map()
def filter_reached_threshold(attestations, summary_time) do
# For security reason we reject the attestation with less than 35% of expected confirmations
# It may be due to latency, so we forward the attestation for next summary

# In a summary there is maybe attestations from last summary
previous_summary_time = summary_time |> SummaryTimer.previous_summary()

# replicas_count = 0 when we allow any confirmations number
previous_replicas_count =
with nb_nodes when nb_nodes > 0 <-
P2P.authorized_and_available_nodes(previous_summary_time, true) |> length(),
expected_number <- HypergeometricDistribution.run_simulation(nb_nodes),
true <- expected_number > @minimum_nodes_for_threshold do
expected_number
else
_ -> 0
end

current_replicas_count =
with nb_nodes when nb_nodes > 0 <-
P2P.authorized_and_available_nodes(summary_time, true) |> length(),
expected_number <- HypergeometricDistribution.run_simulation(nb_nodes),
true <- expected_number > @minimum_nodes_for_threshold do
expected_number
else
_ -> 0
end

if previous_replicas_count + current_replicas_count == 0 do
# Nothing to filter
%{accepted: attestations, refused: []}
else
do_filter_reached_threshold(
attestations,
previous_replicas_count,
current_replicas_count,
previous_summary_time
)
end
end

defp do_filter_reached_threshold(
attestations,
previous_replicas_count,
current_replicas_count,
previous_summary_time
) do
Enum.reduce(
attestations,
%{accepted: [], refused: []},
fn attestation = %__MODULE__{
transaction_summary: %TransactionSummary{timestamp: timestamp},
confirmations: confirmations
},
acc ->
replicas_count =
if DateTime.compare(timestamp, previous_summary_time) == :lt,
do: previous_replicas_count,
else: current_replicas_count

if length(confirmations) >= replicas_count * @confirmations_threshold do
# Confirmations reached threshold we accept the attestation in the summary
Map.update!(acc, :accepted, &[attestation | &1])
else
# Confirmations didn't reached threshold, we postpone attestation in next summary
Map.update!(acc, :refused, &[attestation | &1])
end
end
)
|> Map.update!(:accepted, &Enum.reverse/1)
end

defp check_transaction_summary(nodes, expected_summary, timeout \\ 500)

defp check_transaction_summary([], _, _), do: {:error, :network_issue}
Expand Down
42 changes: 36 additions & 6 deletions lib/archethic/beacon_chain/summary_aggregate.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ defmodule Archethic.BeaconChain.SummaryAggregate do

alias Archethic.TransactionChain.TransactionSummary

alias Archethic.PubSub

alias Archethic.Utils

require Logger

@availability_adding_time :archethic
|> Application.compile_env!(Archethic.SelfRepair.Scheduler)
|> Keyword.fetch!(:availability_application)
Expand Down Expand Up @@ -62,9 +66,7 @@ defmodule Archethic.BeaconChain.SummaryAggregate do
fn prev ->
transaction_attestations
|> Enum.filter(&(ReplicationAttestation.validate(&1, false) == :ok))
|> Enum.map(& &1.transaction_summary)
|> Enum.concat(prev)
|> Enum.uniq_by(& &1.address)
end
)
|> Map.update!(:availability_adding_time, &[availability_adding_time | &1])
Expand Down Expand Up @@ -106,11 +108,15 @@ defmodule Archethic.BeaconChain.SummaryAggregate do
Aggregate summaries batch
"""
@spec aggregate(t()) :: t()
def aggregate(agg = %__MODULE__{}) do
def aggregate(agg = %__MODULE__{summary_time: summary_time}) do
agg
|> Map.update!(:transaction_summaries, fn transactions ->
transactions
|> Enum.uniq_by(& &1.address)
|> Map.update!(:transaction_summaries, fn attestations ->
# Aggregate all confirmations, then filter the attestations that reached
# the threshold. Postpone to next summary the attestations that didn't reach the threshold
attestations
|> ReplicationAttestation.reduce_confirmations()
|> filter_reached_threshold(summary_time)
|> Enum.map(& &1.transaction_summary)
|> Enum.sort_by(& &1.timestamp, {:asc, DateTime})
end)
|> Map.update!(:availability_adding_time, fn
Expand All @@ -133,6 +139,30 @@ defmodule Archethic.BeaconChain.SummaryAggregate do
end)
end

defp filter_reached_threshold(attestations, summary_time) do
%{accepted: accepted_attestations, refused: refused_attestations} =
ReplicationAttestation.filter_reached_threshold(attestations, summary_time)

Enum.each(
refused_attestations,
fn attestation = %ReplicationAttestation{
transaction_summary: %TransactionSummary{address: address, type: type},
confirmations: confirmations
} ->
Logger.debug(
"Attestation postponed to next summary with #{length(confirmations)} confirmations",
transaction_address: Base.encode16(address),
transaction_type: type
)

# Notification will be catched by subset and add the attestation in current Slot
PubSub.notify_replication_attestation(attestation)
end
)

accepted_attestations
end

defp aggregate_node_availabilities(node_availabilities) do
node_availabilities
|> Enum.zip()
Expand Down
1 change: 1 addition & 0 deletions lib/archethic_web/live/chains/beacon_live.ex
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ defmodule ArchethicWeb.BeaconChainLive do
defp list_transactions_from_summaries(date = %DateTime{}) do
%SummaryAggregate{transaction_summaries: tx_summaries} =
Archethic.fetch_and_aggregate_summaries(date)
|> SummaryAggregate.aggregate()

Enum.sort_by(tx_summaries, & &1.timestamp, {:desc, DateTime})
end
Expand Down
103 changes: 102 additions & 1 deletion test/archethic/beacon_chain/replication_attestation_test.exs
Original file line number Diff line number Diff line change
@@ -1,8 +1,109 @@
defmodule Archethic.BeaconChain.ReplicationAttestationTest do
use ExUnit.Case
use ArchethicCase

alias Archethic.BeaconChain.ReplicationAttestation
alias Archethic.BeaconChain.SummaryTimer

alias Archethic.TransactionChain.TransactionSummary

alias Archethic.P2P
alias Archethic.P2P.Node

doctest ReplicationAttestation

describe "filter_unreached_threshold/2" do
setup do
# Summary timer each hour
start_supervised!({SummaryTimer, interval: "0 0 * * * *"})
# Create 10 nodes on last summary
Enum.each(0..9, fn i ->
P2P.add_and_connect_node(%Node{
ip: {88, 130, 19, i},
port: 3000 + i,
last_public_key: :crypto.strong_rand_bytes(32),
first_public_key: :crypto.strong_rand_bytes(32),
geo_patch: "AAA",
available?: true,
authorized?: true,
authorization_date: DateTime.utc_now() |> DateTime.add(-1, :hour),
enrollment_date: DateTime.utc_now() |> DateTime.add(-2, :hour)
})
end)

# Add two node in the current summary
P2P.add_and_connect_node(%Node{
ip: {127, 0, 0, 1},
port: 3000,
last_public_key: :crypto.strong_rand_bytes(32),
first_public_key: :crypto.strong_rand_bytes(32),
geo_patch: "AAA",
available?: true,
authorized?: true,
authorization_date: DateTime.utc_now(),
enrollment_date: DateTime.utc_now()
})

# Add two node in the current summary
P2P.add_and_connect_node(%Node{
ip: {127, 0, 0, 2},
port: 3001,
last_public_key: :crypto.strong_rand_bytes(32),
first_public_key: :crypto.strong_rand_bytes(32),
geo_patch: "AAA",
available?: true,
authorized?: true,
authorization_date: DateTime.utc_now(),
enrollment_date: DateTime.utc_now()
})
end

test "should not remove attestation if minimum node is not reached" do
attestations = [
# First Replication with enough threshold
%ReplicationAttestation{
transaction_summary: %TransactionSummary{
timestamp: DateTime.utc_now() |> DateTime.add(-1, :hour)
},
confirmations: Enum.map(0..9, &{&1, "signature#{&1}"})
},
# Second Replication without enough threshold
%ReplicationAttestation{
transaction_summary: %TransactionSummary{
timestamp: DateTime.utc_now() |> DateTime.add(-1, :hour)
},
confirmations: Enum.map(0..2, &{&1, "signature#{&1}"})
}
]

summary_time = DateTime.utc_now() |> SummaryTimer.next_summary()

assert %{accepted: ^attestations, refused: []} =
ReplicationAttestation.filter_reached_threshold(attestations, summary_time)
end

test "should remove attestation if threshold is not reached" do
# First Replication with enough threshold
attestation1 = %ReplicationAttestation{
transaction_summary: %TransactionSummary{
timestamp: DateTime.utc_now()
},
confirmations: Enum.map(0..9, &{&1, "signature#{&1}"})
}

# Second Replication without enough threshold
attestation2 = %ReplicationAttestation{
transaction_summary: %TransactionSummary{
timestamp: DateTime.utc_now()
},
confirmations: Enum.map(0..2, &{&1, "signature#{&1}"})
}

attestations = [attestation1, attestation2]

summary_time = DateTime.utc_now() |> SummaryTimer.next_summary()

assert %{accepted: [^attestation1], refused: [^attestation2]} =
ReplicationAttestation.filter_reached_threshold(attestations, summary_time)
end
end
end
83 changes: 82 additions & 1 deletion test/archethic/beacon_chain/summary_aggregate_test.exs
Original file line number Diff line number Diff line change
@@ -1,8 +1,89 @@
defmodule Archethic.BeaconChain.SummaryAggregateTest do
use ExUnit.Case
use ArchethicCase

alias Archethic.BeaconChain.ReplicationAttestation
alias Archethic.BeaconChain.SummaryAggregate
alias Archethic.BeaconChain.SummaryTimer
alias Archethic.TransactionChain.TransactionSummary

alias Archethic.P2P
alias Archethic.P2P.Node

alias Archethic.PubSub

doctest SummaryAggregate

test "aggregate_slots should postpone attestation if refused" do
# Summary timer each hour
start_supervised!({SummaryTimer, interval: "0 0 * * * *"})
# Create 10 nodes on last summary
Enum.each(0..9, fn i ->
P2P.add_and_connect_node(%Node{
ip: {88, 130, 19, i},
port: 3000 + i,
last_public_key: :crypto.strong_rand_bytes(32),
first_public_key: :crypto.strong_rand_bytes(32),
geo_patch: "AAA",
available?: true,
authorized?: true,
authorization_date: DateTime.utc_now() |> DateTime.add(-1, :hour),
enrollment_date: DateTime.utc_now() |> DateTime.add(-2, :hour)
})
end)

# Add two node in the current summary
P2P.add_and_connect_node(%Node{
ip: {127, 0, 0, 1},
port: 3000,
last_public_key: :crypto.strong_rand_bytes(32),
first_public_key: :crypto.strong_rand_bytes(32),
geo_patch: "AAA",
available?: true,
authorized?: true,
authorization_date: DateTime.utc_now(),
enrollment_date: DateTime.utc_now()
})

# Add two node in the current summary
P2P.add_and_connect_node(%Node{
ip: {127, 0, 0, 2},
port: 3001,
last_public_key: :crypto.strong_rand_bytes(32),
first_public_key: :crypto.strong_rand_bytes(32),
geo_patch: "AAA",
available?: true,
authorized?: true,
authorization_date: DateTime.utc_now(),
enrollment_date: DateTime.utc_now()
})

# First Replication with enough threshold
attestation1 = %ReplicationAttestation{
transaction_summary: %TransactionSummary{
address: :crypto.strong_rand_bytes(32),
timestamp: DateTime.utc_now()
},
confirmations: Enum.map(0..9, &{&1, "signature#{&1}"})
}

# Second Replication without enough threshold
attestation2 = %ReplicationAttestation{
transaction_summary: %TransactionSummary{
address: :crypto.strong_rand_bytes(32),
timestamp: DateTime.utc_now()
},
confirmations: Enum.map(0..2, &{&1, "signature#{&1}"})
}

attestations = [attestation1, attestation2]

summary_time = DateTime.utc_now() |> SummaryTimer.next_summary()

PubSub.register_to_new_replication_attestations()

%SummaryAggregate{summary_time: summary_time, transaction_summaries: attestations}
|> SummaryAggregate.aggregate()

assert_receive {:new_replication_attestation, ^attestation2}
end
end
Loading

0 comments on commit 0f210f4

Please sign in to comment.