Skip to content

Commit

Permalink
Integrate quorum queris in self-repair
Browse files Browse the repository at this point in the history
  • Loading branch information
Samuel committed Jun 8, 2022
1 parent 3aa8491 commit dc3e16b
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 54 deletions.
104 changes: 66 additions & 38 deletions lib/archethic/self_repair/sync/beacon_summary_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,37 @@ defmodule Archethic.SelfRepair.Sync.BeaconSummaryHandler do
def get_full_beacon_summary(summary_time, subset, nodes) do
summary_address = Crypto.derive_beacon_chain_address(subset, summary_time, true)

Task.Supervisor.async_stream_nolink(
TaskSupervisor,
Enum.filter(nodes, &Node.locally_available?/1),
fn node ->
P2P.send_message(node, %GetBeaconSummary{address: summary_address})
end,
on_timeout: :kill_task,
ordered: false
)
|> Enum.filter(&match?({:ok, {:ok, %BeaconSummary{}}}, &1))
|> Enum.map(fn {:ok, {:ok, summary}} -> summary end)
|> Enum.reject(&BeaconSummary.empty?/1)
|> Enum.reduce(
%{
transaction_attestations: [],
node_availabilities: [],
node_average_availabilities: [],
end_of_node_synchronizations: []
},
&do_reduce_beacon_summaries/2
)
|> aggregate(summary_time, subset)
resolve_conflicts = fn results ->
if Enum.any?(results, &match?(%BeaconSummary{}, &1)) do
results
|> Enum.reject(&BeaconSummary.empty?/1)
# We are aggregating all summaries to avoid lost of information
|> Enum.reduce(
%{
transaction_attestations: [],
node_availabilities: [],
node_average_availabilities: []
},
&do_reduce_beacon_summaries/2
)
|> aggregate(summary_time, subset)
else
%NotFound{}
end
end

case P2P.send_and_read_with_quorum(
nodes,
%GetBeaconSummary{address: summary_address},
length(nodes),
resolve_conflicts
) do
{:ok, summary = %BeaconSummary{}} ->
summary

{:ok, %NotFound{}} ->
%BeaconSummary{summary_time: summary_time, subset: subset}
end
end

defp do_reduce_beacon_summaries(
Expand Down Expand Up @@ -147,27 +156,46 @@ defmodule Archethic.SelfRepair.Sync.BeaconSummaryHandler do
{:ok, BeaconSummary.t() | NotFound.t()} | {:error, any()}
def download_summary(_beacon_address, [], _), do: {:ok, %NotFound{}}

def download_summary(beacon_address, nodes, patch) do
nodes
|> P2P.nearest_nodes(patch)
|> do_get_download_summary(beacon_address, nil)
def download_summary(beacon_address, nodes, _patch) do
resolve_conflicts = fn results ->
if Enum.any?(results, &match?(%BeaconSummary{}, &1)) do
results
|> Enum.reject(&BeaconSummary.empty?/1)
|> Enum.max(
fn %BeaconSummary{transaction_attestations: attestations_a},
%BeaconSummary{transaction_attestations: attestations_b} ->
length(attestations_a) > length(attestations_b)
end,
fn -> %NotFound{} end
)
else
%NotFound{}
end
end

P2P.send_and_read_with_quorum(
nodes,
%GetBeaconSummary{address: beacon_address},
length(nodes),
resolve_conflicts
)
end

defp do_get_download_summary([node | rest], address, prev_result) do
case P2P.send_message(node, %GetBeaconSummary{address: address}) do
{:ok, summary = %BeaconSummary{}} ->
{:ok, summary}
# defp do_get_download_summary([node | rest], address, prev_result) do
# case P2P.send_message(node, %GetBeaconSummary{address: address}) do
# {:ok, summary = %BeaconSummary{}} ->
# {:ok, summary}

{:ok, %NotFound{}} ->
do_get_download_summary(rest, address, %NotFound{})
# {:ok, %NotFound{}} ->
# do_get_download_summary(rest, address, %NotFound{})

{:error, _} ->
do_get_download_summary(rest, address, prev_result)
end
end
# {:error, _} ->
# do_get_download_summary(rest, address, prev_result)
# end
# end

defp do_get_download_summary([], _, %NotFound{}), do: {:ok, %NotFound{}}
defp do_get_download_summary([], _, _), do: {:error, :network_issue}
# defp do_get_download_summary([], _, %NotFound{}), do: {:ok, %NotFound{}}
# defp do_get_download_summary([], _, _), do: {:error, :network_issue}

@doc """
Process beacon summary to synchronize the transactions involving.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ defmodule Archethic.SelfRepair.Sync.BeaconSummaryHandler.TransactionHandler do
alias Archethic.Election

alias Archethic.P2P
alias Archethic.P2P.Message.GetTransaction
alias Archethic.P2P.Node

alias Archethic.Replication

alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.TransactionSummary

Expand Down Expand Up @@ -67,34 +67,28 @@ defmodule Archethic.SelfRepair.Sync.BeaconSummaryHandler.TransactionHandler do
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)

case fetch_transaction(storage_nodes, address) do
case TransactionChain.fetch_transaction_remotely(address, storage_nodes) do
{:ok, tx = %Transaction{}} ->
tx

{:error, :network_issue} ->
{:error, :transaction_not_exists} ->
Logger.error("Cannot fetch the transaction to sync",
transaction_address: Base.encode16(address),
transaction_type: type
)

raise "Network issue during during self repair"
end
end
raise "Transaction doesn't exist"

defp fetch_transaction([node | rest], address) do
case P2P.send_message(node, %GetTransaction{address: address}) do
{:ok, tx = %Transaction{}} ->
{:ok, tx}
{:error, :network_issue} ->
Logger.error("Cannot fetch the transaction to sync",
transaction_address: Base.encode16(address),
transaction_type: type
)

_ ->
fetch_transaction(rest, address)
raise "Network issue during during self repair"
end
end

defp fetch_transaction([], _) do
{:error, :network_issue}
end

@spec process_transaction(Transaction.t()) :: :ok | {:error, :invalid_transaction}
def process_transaction(
tx = %Transaction{
Expand Down

0 comments on commit dc3e16b

Please sign in to comment.