Skip to content

Commit

Permalink
Verify attestation confirmations threshold
Browse files Browse the repository at this point in the history
if refused, attestation is postponed to next summary
  • Loading branch information
Neylix committed Mar 13, 2023
1 parent f7190cb commit 3b842b9
Show file tree
Hide file tree
Showing 9 changed files with 428 additions and 49 deletions.
26 changes: 25 additions & 1 deletion lib/archethic/beacon_chain/replication_attestation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Archethic.BeaconChain.ReplicationAttestation do
alias Archethic.Crypto

alias Archethic.Election
alias Archethic.Election.HypergeometricDistribution

alias Archethic.P2P
alias Archethic.P2P.Message.GetTransactionSummary
Expand All @@ -23,6 +24,11 @@ defmodule Archethic.BeaconChain.ReplicationAttestation do
confirmations: list({position :: non_neg_integer(), signature :: binary()})
}

# Minimum 10 nodes to start verifying the threshold
@minimum_nodes_for_threshold 10
# Minimum 35% of the expected confirmations must be present
@confirmations_threshold 0.35

@doc """
Serialize a replication attestation
Expand Down Expand Up @@ -213,7 +219,7 @@ defmodule Archethic.BeaconChain.ReplicationAttestation do
Take a list of attestations and reduce them to return a list of unique attestation
for a transaction with all the confirmations
"""
@spec reduce_confirmations(Enumerable.t(t())) :: Stream.t(t())
@spec reduce_confirmations(Enumerable.t(t())) :: Enumerable.t(t())
def reduce_confirmations(attestations) do
attestations
|> Stream.transform(
Expand All @@ -240,6 +246,24 @@ defmodule Archethic.BeaconChain.ReplicationAttestation do
)
end

@doc """
Return true if the attestation reached the minimum confirmations threshold
"""
@spec reached_threshold?(t()) :: boolean()
def reached_threshold?(%__MODULE__{
transaction_summary: %TransactionSummary{timestamp: timestamp},
confirmations: confirmations
}) do
# For security reason we reject the attestation with less than 35% of expected confirmations
with nb_nodes when nb_nodes > 0 <- P2P.authorized_and_available_nodes(timestamp) |> length(),
replicas_count <- HypergeometricDistribution.run_simulation(nb_nodes),
true <- replicas_count > @minimum_nodes_for_threshold do
length(confirmations) >= replicas_count * @confirmations_threshold
else
_ -> true
end
end

defp check_transaction_summary(nodes, expected_summary, timeout \\ 500)

defp check_transaction_summary([], _, _), do: {:error, :network_issue}
Expand Down
48 changes: 41 additions & 7 deletions lib/archethic/beacon_chain/summary_aggregate.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ defmodule Archethic.BeaconChain.SummaryAggregate do

alias Archethic.Utils

require Logger

@availability_adding_time :archethic
|> Application.compile_env!(Archethic.SelfRepair.Scheduler)
|> Keyword.fetch!(:availability_application)
Expand Down Expand Up @@ -62,9 +64,7 @@ defmodule Archethic.BeaconChain.SummaryAggregate do
fn prev ->
transaction_attestations
|> Enum.filter(&(ReplicationAttestation.validate(&1, false) == :ok))
|> Enum.map(& &1.transaction_summary)
|> Enum.concat(prev)
|> Enum.uniq_by(& &1.address)
end
)
|> Map.update!(:availability_adding_time, &[availability_adding_time | &1])
Expand Down Expand Up @@ -106,12 +106,14 @@ defmodule Archethic.BeaconChain.SummaryAggregate do
Aggregate summaries batch
"""
@spec aggregate(t()) :: t()
def aggregate(agg = %__MODULE__{}) do
def aggregate(agg) do
agg
|> Map.update!(:transaction_summaries, fn transactions ->
transactions
|> Enum.uniq_by(& &1.address)
|> Enum.sort_by(& &1.timestamp, {:asc, DateTime})
|> Map.update!(:transaction_summaries, fn attestations ->
# Aggregate all confirmations, then filter the attestations that reached
# the threshold. Postpone to next summary the attestations that didn't reach the threshold
attestations
|> ReplicationAttestation.reduce_confirmations()
|> Enum.sort_by(& &1.transaction_summary.timestamp, {:asc, DateTime})
end)
|> Map.update!(:availability_adding_time, fn
[] ->
Expand All @@ -133,6 +135,38 @@ defmodule Archethic.BeaconChain.SummaryAggregate do
end)
end

@doc """
Filter replication attestations list of a summary to keep only the one that reached the
minimum confirmations threshold and return the refused ones
"""
@spec filter_reached_threshold(t()) :: {t(), list(ReplicationAttestation.t())}
def filter_reached_threshold(aggregate = %__MODULE__{transaction_summaries: attestations}) do
%{accepted: accepted_attestations, refused: refused_attestations} =
Enum.reduce(
attestations,
%{accepted: [], refused: []},
fn attestation, acc ->
if ReplicationAttestation.reached_threshold?(attestation) do
# Confirmations reached threshold we accept the attestation in the summary
Map.update!(acc, :accepted, &[attestation | &1])
else
# Confirmations didn't reached threshold, we postpone attestation in next summary
Map.update!(acc, :refused, &[attestation | &1])
end
end
)
|> Map.update!(:accepted, &Enum.reverse/1)

filtered_aggregate =
Map.put(
aggregate,
:transaction_summaries,
Enum.map(accepted_attestations, & &1.transaction_summary)
)

{filtered_aggregate, refused_attestations}
end

defp aggregate_node_availabilities(node_availabilities) do
node_availabilities
|> Enum.zip()
Expand Down
49 changes: 48 additions & 1 deletion lib/archethic/self_repair/sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule Archethic.SelfRepair.Sync do
@moduledoc false

alias Archethic.BeaconChain
alias Archethic.BeaconChain.ReplicationAttestation
alias Archethic.BeaconChain.Subset.P2PSampling
alias Archethic.BeaconChain.Summary
alias Archethic.BeaconChain.SummaryAggregate
Expand Down Expand Up @@ -133,7 +134,9 @@ defmodule Archethic.SelfRepair.Sync do
last_aggregate = BeaconChain.fetch_and_aggregate_summaries(last_summary_time, download_nodes)
ensure_download_last_aggregate(last_aggregate, download_nodes)

last_aggregate = aggregate_with_local_summaries(last_aggregate, last_summary_time)
last_aggregate =
aggregate_with_local_summaries(last_aggregate, last_summary_time)
|> verify_attestations_threshold()

summaries_aggregates
|> Stream.concat([last_aggregate])
Expand Down Expand Up @@ -205,6 +208,50 @@ defmodule Archethic.SelfRepair.Sync do
|> SummaryAggregate.aggregate()
end

defp verify_attestations_threshold(summary_aggregate) do
{filtered_summary_aggregate, refused_attestations} =
SummaryAggregate.filter_reached_threshold(summary_aggregate)

postpone_refused_attestations(refused_attestations)

filtered_summary_aggregate
end

defp postpone_refused_attestations(attestations) do
slot_time = DateTime.utc_now() |> BeaconChain.next_slot()
nodes = P2P.authorized_and_available_nodes(slot_time)

Enum.each(
attestations,
fn attestation = %ReplicationAttestation{
transaction_summary: %TransactionSummary{address: address, type: type},
confirmations: confirmations
} ->
# Postpone only if we are the current beacon slot node
# (otherwise all nodes would postpone as the self repair is run on all nodes)
slot_node? =
BeaconChain.subset_from_address(address)
|> Election.beacon_storage_nodes(
slot_time,
nodes,
Election.get_storage_constraints()
)
|> Utils.key_in_node_list?(Crypto.first_node_public_key())

if slot_node? do
Logger.debug(
"Attestation postponed to next summary with #{length(confirmations)} confirmations",
transaction_address: Base.encode16(address),
transaction_type: type
)

# Notification will be catched by subset and add the attestation in current Slot
PubSub.notify_replication_attestation(attestation)
end
end
)
end

@doc """
Process beacon summary to synchronize the transactions involving.
Expand Down
11 changes: 9 additions & 2 deletions lib/archethic_web/graphql_schema/resolver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,15 @@ defmodule ArchethicWeb.GraphQLSchema.Resolver do
end

:eq ->
BeaconChain.fetch_and_aggregate_summaries(next_datetime_summary_time, authorized_nodes)
|> SummaryAggregate.aggregate()
{summary_aggregate, _} =
BeaconChain.fetch_and_aggregate_summaries(
next_datetime_summary_time,
authorized_nodes
)
|> SummaryAggregate.aggregate()
|> SummaryAggregate.filter_reached_threshold()

summary_aggregate

:lt ->
case BeaconChain.fetch_summaries_aggregate(next_datetime_summary_time, authorized_nodes) do
Expand Down
4 changes: 3 additions & 1 deletion lib/archethic_web/live/chains/beacon_live.ex
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,10 @@ defmodule ArchethicWeb.BeaconChainLive do
end

defp list_transactions_from_summaries(date = %DateTime{}) do
%SummaryAggregate{transaction_summaries: tx_summaries} =
{%SummaryAggregate{transaction_summaries: tx_summaries}, _} =
Archethic.fetch_and_aggregate_summaries(date)
|> SummaryAggregate.aggregate()
|> SummaryAggregate.filter_reached_threshold()

Enum.sort_by(tx_summaries, & &1.timestamp, {:desc, DateTime})
end
Expand Down
98 changes: 97 additions & 1 deletion test/archethic/beacon_chain/replication_attestation_test.exs
Original file line number Diff line number Diff line change
@@ -1,8 +1,104 @@
defmodule Archethic.BeaconChain.ReplicationAttestationTest do
use ExUnit.Case
use ArchethicCase

alias Archethic.BeaconChain.ReplicationAttestation
alias Archethic.BeaconChain.SummaryTimer

alias Archethic.TransactionChain.TransactionSummary

alias Archethic.P2P
alias Archethic.P2P.Node

doctest ReplicationAttestation

describe "reached_threshold?/1" do
setup do
# Summary timer each hour
start_supervised!({SummaryTimer, interval: "0 0 * * * *"})
# Create 10 nodes on last summary
Enum.each(0..9, fn i ->
P2P.add_and_connect_node(%Node{
ip: {88, 130, 19, i},
port: 3000 + i,
last_public_key: :crypto.strong_rand_bytes(32),
first_public_key: :crypto.strong_rand_bytes(32),
geo_patch: "AAA",
available?: true,
authorized?: true,
authorization_date: DateTime.utc_now() |> DateTime.add(-1, :hour),
enrollment_date: DateTime.utc_now() |> DateTime.add(-2, :hour)
})
end)

# Add two node in the current summary
P2P.add_and_connect_node(%Node{
ip: {127, 0, 0, 1},
port: 3000,
last_public_key: :crypto.strong_rand_bytes(32),
first_public_key: :crypto.strong_rand_bytes(32),
geo_patch: "AAA",
available?: true,
authorized?: true,
authorization_date: DateTime.utc_now(),
enrollment_date: DateTime.utc_now()
})

# Add two node in the current summary
P2P.add_and_connect_node(%Node{
ip: {127, 0, 0, 2},
port: 3001,
last_public_key: :crypto.strong_rand_bytes(32),
first_public_key: :crypto.strong_rand_bytes(32),
geo_patch: "AAA",
available?: true,
authorized?: true,
authorization_date: DateTime.utc_now(),
enrollment_date: DateTime.utc_now()
})
end

test "should return true if attestation reached threshold" do
# First Replication with enough threshold
attestation = %ReplicationAttestation{
transaction_summary: %TransactionSummary{
timestamp: DateTime.utc_now() |> DateTime.add(-1, :hour)
},
confirmations: Enum.map(0..9, &{&1, "signature#{&1}"})
}

assert ReplicationAttestation.reached_threshold?(attestation)

# Second Replication without enough threshold
attestation = %ReplicationAttestation{
transaction_summary: %TransactionSummary{
timestamp: DateTime.utc_now() |> DateTime.add(-1, :hour)
},
confirmations: Enum.map(0..2, &{&1, "signature#{&1}"})
}

assert ReplicationAttestation.reached_threshold?(attestation)
end

test "should return false if attestation do not reach threshold" do
# First Replication with enough threshold
attestation = %ReplicationAttestation{
transaction_summary: %TransactionSummary{
timestamp: DateTime.utc_now()
},
confirmations: Enum.map(0..9, &{&1, "signature#{&1}"})
}

assert ReplicationAttestation.reached_threshold?(attestation)

# Second Replication without enough threshold
attestation = %ReplicationAttestation{
transaction_summary: %TransactionSummary{
timestamp: DateTime.utc_now()
},
confirmations: Enum.map(0..2, &{&1, "signature#{&1}"})
}

refute ReplicationAttestation.reached_threshold?(attestation)
end
end
end
Loading

0 comments on commit 3b842b9

Please sign in to comment.