Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix slot missing transaction #923

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions lib/archethic/beacon_chain/replication_attestation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Archethic.BeaconChain.ReplicationAttestation do
alias Archethic.Crypto

alias Archethic.Election
alias Archethic.Election.HypergeometricDistribution

alias Archethic.P2P
alias Archethic.P2P.Message.GetTransactionSummary
Expand All @@ -23,6 +24,11 @@ defmodule Archethic.BeaconChain.ReplicationAttestation do
confirmations: list({position :: non_neg_integer(), signature :: binary()})
}

# Minimum 10 nodes to start verifying the threshold
@minimum_nodes_for_threshold 10
# Minimum 35% of the expected confirmations must be present
@confirmations_threshold 0.35

@doc """
Serialize a replication attestation

Expand Down Expand Up @@ -209,6 +215,55 @@ defmodule Archethic.BeaconChain.ReplicationAttestation do
end
end

@doc """
Take a list of attestations and reduce them to return a list of unique attestation
for a transaction with all the confirmations
"""
@spec reduce_confirmations(Enumerable.t(t())) :: Enumerable.t(t())
def reduce_confirmations(attestations) do
attestations
|> Stream.transform(
# start function, init acc
fn -> %{} end,
# reducer function, return empty enum, accumulate replication attestation by address in acc
fn attestation = %__MODULE__{
transaction_summary: %TransactionSummary{address: address},
confirmations: confirmations
},
acc ->
# Accumulate distinct confirmations in a replication attestation
acc =
Map.update(acc, address, attestation, fn reduced_attest ->
Map.update!(reduced_attest, :confirmations, &((&1 ++ confirmations) |> Enum.uniq()))
end)

{[], acc}
end,
# last function, return acc in the enumeration
fn acc -> {Map.values(acc), acc} end,
# after function, do nothing
fn _ -> :ok end
)
end

@doc """
Return true if the attestation reached the minimum confirmations threshold
"""
@spec reached_threshold?(t()) :: boolean()
def reached_threshold?(%__MODULE__{
transaction_summary: %TransactionSummary{timestamp: timestamp},
confirmations: confirmations
}) do
# For security reason we reject the attestation with less than 35% of expected confirmations
with nb_nodes when nb_nodes > 0 <- P2P.authorized_and_available_nodes(timestamp) |> length(),
replicas_count <- HypergeometricDistribution.run_simulation(nb_nodes),
true <- replicas_count > @minimum_nodes_for_threshold do
length(confirmations) >= replicas_count * @confirmations_threshold
else
_ -> true
end
end

defp check_transaction_summary(nodes, expected_summary, timeout \\ 500)

defp check_transaction_summary([], _, _), do: {:error, :network_issue}
Expand Down
92 changes: 64 additions & 28 deletions lib/archethic/beacon_chain/subset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ defmodule Archethic.BeaconChain.Subset do
alias Archethic.P2P.Message.NewBeaconSlot
alias Archethic.P2P.Message.BeaconUpdate
alias Archethic.P2P.Message.TransactionSummaryMessage
alias Archethic.P2P.Message.ReplicationAttestationMessage

alias Archethic.PubSub

Expand Down Expand Up @@ -91,8 +92,7 @@ defmodule Archethic.BeaconChain.Subset do
subset: subset,
slot_time: SlotTimer.next_slot(DateTime.utc_now())
},
subscribed_nodes: [],
postponed: %{end_of_sync: [], transaction_attestations: []}
subscribed_nodes: []
}}
end

Expand Down Expand Up @@ -179,11 +179,14 @@ defmodule Archethic.BeaconChain.Subset do
state = %{
current_slot: current_slot = %Slot{slot_time: slot_time},
subset: subset,
subscribed_nodes: subscribed_nodes
subscribed_nodes: subscribed_nodes,
node_public_key: node_public_key
}
) do
with ^subset <- BeaconChain.subset_from_address(address),
^slot_time <- SlotTimer.next_slot(timestamp) do
true <- is_valid_time?(timestamp, slot_time),
{:forward?, false} <-
{:forward?, forward_attestation?(slot_time, subset, node_public_key)} do
{new_tx?, new_slot} =
Slot.add_transaction_attestation(
current_slot,
Expand Down Expand Up @@ -230,23 +233,53 @@ defmodule Archethic.BeaconChain.Subset do
{:noreply, %{state | current_slot: new_slot}}
end
else
next_slot_time = %DateTime{} ->
new_state = update_in(state, [:postponed, :transaction_attestations], &[attestation | &1])
{:forward?, true} ->
forward_attestation(attestation, subset, slot_time)
{:noreply, state}

Logger.info(
"Transaction #{type}@#{Base.encode16(address)} will be added to the next beacon chain (#{DateTime.to_string(next_slot_time)} slot) - tx timestamp: #{timestamp} - current slot time: #{slot_time}",
false ->
Logger.warning(
"Transaction #{type}@#{Base.encode16(address)} is refused as it is not in the expected range time (in #{DateTime.to_string(slot_time)} slot)",
beacon_subset: Base.encode16(subset)
)

notify_subscribed_nodes(subscribed_nodes, attestation)

{:noreply, new_state}
{:noreply, state}

_ ->
{:noreply, state}
end
end

defp is_valid_time?(timestamp, slot_time) do
# We verify if the transaction timestamp is over than the current and 2 previous summary time
# We check previous summary time in case of remainder tx due to latency
# S1 ----------- S2 -----n----- S3
# We want to verify the transaction is between S1 and S3, "n" is the current time
next_summary = SummaryTimer.next_summary(slot_time)

previous_summary =
slot_time |> SummaryTimer.previous_summary() |> SummaryTimer.previous_summary()

DateTime.compare(timestamp, previous_summary) in [:eq, :gt] and
DateTime.compare(timestamp, next_summary) == :lt
end

defp forward_attestation?(slot_time, subset, node_public_key) do
# We need to forward the transaction if this is the last slot of a summary
# and we are not a summary node because the current slot will not be sent to summary node
summary_time?(slot_time) and not beacon_summary_node?(subset, slot_time, node_public_key)
end

defp forward_attestation(attestation, subset, slot_time) do
nodes = P2P.authorized_and_available_nodes(slot_time, true)

subset
|> Election.beacon_storage_nodes(slot_time, nodes)
|> P2P.broadcast_message(
ReplicationAttestationMessage.from_replication_attestation(attestation)
)
end

defp notify_subscribed_nodes(nodes, %ReplicationAttestation{
transaction_summary:
tx_summary = %TransactionSummary{timestamp: timestamp, address: address}
Expand Down Expand Up @@ -296,16 +329,7 @@ defmodule Archethic.BeaconChain.Subset do

defp update_p2p_view?(_), do: true

defp next_state(
state = %{
subset: subset,
postponed: %{
transaction_attestations: transaction_attestations,
end_of_sync: end_of_sync
}
},
time
) do
defp next_state(state = %{subset: subset}, time) do
next_time = SlotTimer.next_slot(time)

state
Expand All @@ -314,15 +338,11 @@ defmodule Archethic.BeaconChain.Subset do
%Slot{
subset: subset,
slot_time: next_time,
transaction_attestations: transaction_attestations,
end_of_node_synchronizations: end_of_sync
transaction_attestations: [],
end_of_node_synchronizations: []
}
)
|> Map.put(:postponed, %{transaction_attestations: [], end_of_sync: []})
|> Map.put(
:subscribed_nodes,
[]
)
|> Map.put(:subscribed_nodes, [])
end

defp broadcast_beacon_slot(subset, next_time, slot) do
Expand Down Expand Up @@ -391,4 +411,20 @@ defmodule Archethic.BeaconChain.Subset do
end

defp ensure_p2p_view(slot = %Slot{}, _), do: slot

def code_change(
"1.0.7",
state = %{postponed: %{transaction_attestations: postponed_attestations}},
_extra
) do
new_state =
Enum.reduce(postponed_attestations, state, fn attestation, acc ->
Map.update!(acc, :current_slot, &Slot.add_transaction_attestation(&1, attestation))
end)
|> Map.delete(:postponed)

{:ok, new_state}
end

def code_change(_, state, _), do: {:ok, state}
end
6 changes: 1 addition & 5 deletions lib/archethic/beacon_chain/summary.ex
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,7 @@ defmodule Archethic.BeaconChain.Summary do
transaction_attestations =
slots
|> Stream.flat_map(& &1.transaction_attestations)
|> Stream.uniq_by(fn %ReplicationAttestation{
transaction_summary: %TransactionSummary{address: address}
} ->
address
end)
|> ReplicationAttestation.reduce_confirmations()
|> Enum.sort_by(
fn %ReplicationAttestation{
transaction_summary: %TransactionSummary{timestamp: timestamp}
Expand Down
Loading