Skip to content

Commit

Permalink
Add quorum_read/4 (#369)
Browse files Browse the repository at this point in the history
Define a new function in the P2P module to handle message sending to
multiple nodes and handle reads using monotonic quorum reads.

The closest node is used to fetch the first results and others results are fetched from a subgroup defined by a consistency level argument.

The results should be newer than the prior one, based on some conflict resolver function passed as argument.
  • Loading branch information
Samuel committed Jun 14, 2022
1 parent 4c7b023 commit 0629896
Show file tree
Hide file tree
Showing 28 changed files with 1,290 additions and 905 deletions.
221 changes: 63 additions & 158 deletions lib/archethic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,13 @@ defmodule Archethic do
alias __MODULE__.P2P

alias __MODULE__.P2P.Message.Balance
alias __MODULE__.P2P.Message.Error
alias __MODULE__.P2P.Message.GetBalance
alias __MODULE__.P2P.Message.GetLastTransaction
alias __MODULE__.P2P.Message.GetLastTransactionAddress
alias __MODULE__.P2P.Message.GetTransaction
alias __MODULE__.P2P.Message.GetTransactionChain
alias __MODULE__.P2P.Message.GetTransactionChainLength
alias __MODULE__.P2P.Message.GetTransactionInputs
alias __MODULE__.P2P.Message.LastTransactionAddress
alias __MODULE__.P2P.Message.NewTransaction
alias __MODULE__.P2P.Message.NotFound
alias __MODULE__.P2P.Message.Ok
alias __MODULE__.P2P.Message.StartMining
alias __MODULE__.P2P.Message.TransactionChainLength
alias __MODULE__.P2P.Message.TransactionInputList
alias __MODULE__.P2P.Message.TransactionList
alias __MODULE__.P2P.Node

alias __MODULE__.TransactionChain
alias __MODULE__.TransactionChain.Transaction
alias __MODULE__.TransactionChain.TransactionInput

Expand All @@ -47,30 +36,14 @@ defmodule Archethic do
def search_transaction(address) when is_binary(address) do
storage_nodes = Election.chain_storage_nodes(address, P2P.available_nodes())

storage_nodes
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)
|> get_transaction(address)
end

defp get_transaction([node | rest], address) do
case P2P.send_message(node, %GetTransaction{address: address}) do
{:ok, tx = %Transaction{}} ->
{:ok, tx}

{:ok, %NotFound{}} ->
{:error, :transaction_not_exists}

{:ok, %Error{}} ->
{:error, :transaction_invalid}
nodes =
storage_nodes
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)

{:error, _} ->
get_transaction(rest, address)
end
TransactionChain.fetch_transaction_remotely(address, nodes)
end

defp get_transaction([], _), do: {:error, :network_issue}

@doc """
Send a new transaction in the network to be mined. The current node will act as welcome node
"""
Expand Down Expand Up @@ -138,61 +111,32 @@ defmodule Archethic do
| {:error, :transaction_not_exists}
| {:error, :transaction_invalid}
| {:error, :network_issue}
def get_last_transaction(address) do
address
|> Election.chain_storage_nodes(P2P.available_nodes())
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)
|> get_last_transaction(address)
end

defp get_last_transaction([node | rest], address) do
case P2P.send_message(node, %GetLastTransaction{address: address}) do
{:ok, tx = %Transaction{}} ->
{:ok, tx}

{:ok, %NotFound{}} ->
{:error, :transaction_not_exists}

{:ok, %Error{}} ->
{:error, :transaction_invalid}

{:error, _} ->
get_last_transaction(rest, address)
def get_last_transaction(address) when is_binary(address) do
case get_last_transaction_address(address) do
{:ok, last_address} ->
nodes =
last_address
|> Election.chain_storage_nodes(P2P.available_nodes())
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)

TransactionChain.fetch_transaction_remotely(last_address, nodes)

{:error, :network_issue} = e ->
e
end
end

defp get_last_transaction([], _), do: {:error, :network_issue}

@doc """
Retrieve the last transaction address for a chain from the closest nodes
"""
@spec get_last_transaction_address(address :: binary()) ::
{:ok, binary()}
| {:error, :network_issue}
def get_last_transaction_address(address) do
address
|> Election.chain_storage_nodes(P2P.available_nodes())
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)
|> get_last_transaction_address(address)
end

defp get_last_transaction_address([node | rest], address) do
case P2P.send_message(node, %GetLastTransactionAddress{
address: address,
timestamp: DateTime.utc_now()
}) do
{:ok, %LastTransactionAddress{address: last_address}} ->
{:ok, last_address}

{:error, _} ->
get_last_transaction_address(rest, address)
end
def get_last_transaction_address(address) when is_binary(address) do
TransactionChain.resolve_last_address(address)
end

defp get_last_transaction_address([], _), do: {:error, :network_issue}

@doc """
Retrieve the balance from an address from the closest nodes
"""
Expand Down Expand Up @@ -223,117 +167,78 @@ defmodule Archethic do
@spec get_transaction_inputs(binary()) ::
{:ok, list(TransactionInput.t())} | {:error, :network_issue}
def get_transaction_inputs(address) when is_binary(address) do
address
|> Election.chain_storage_nodes(P2P.available_nodes())
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)
|> get_transaction_inputs(address)
end

defp get_transaction_inputs([node | rest], address) do
case P2P.send_message(node, %GetTransactionInputs{address: address}) do
{:ok, %TransactionInputList{inputs: inputs}} ->
{:ok, inputs}
nodes =
address
|> Election.chain_storage_nodes(P2P.available_nodes())
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)

{:error, _} ->
get_transaction_inputs(rest, address)
end
TransactionChain.fetch_inputs_remotely(address, nodes, DateTime.utc_now())
end

defp get_transaction_inputs([], _), do: {:error, :network_issue}

@doc """
Retrieve a transaction chain based on an address from the closest nodes
"""
@spec get_transaction_chain(binary()) :: {:ok, list(Transaction.t())} | {:error, :network_issue}
def get_transaction_chain(address) when is_binary(address) do
local_available_nodes = locally_available_nodes(address)
get_transaction_chain(local_available_nodes, address)
end

defp get_transaction_chain(nodes, address, opts \\ [], acc \\ [])

defp get_transaction_chain([node | rest], address, opts, acc) do
case P2P.send_message(node, %GetTransactionChain{
address: address,
paging_state: Keyword.get(opts, :paging_state)
}) do
{:ok, %TransactionList{transactions: transactions, more?: false}} ->
{:ok, Enum.uniq_by(acc ++ transactions, & &1.address)}

{:ok, %TransactionList{transactions: transactions, more?: true, paging_state: paging_state}} ->
get_transaction_chain(
[node | rest],
address,
[paging_state: paging_state],
Enum.uniq_by(acc ++ transactions, & &1.address)
)
nodes =
address
|> Election.chain_storage_nodes(P2P.available_nodes())
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)

{:error, _} ->
get_transaction_chain(rest, address, opts, acc)
try do
chain =
address
|> TransactionChain.stream_remotely(nodes)
|> Enum.to_list()
|> List.flatten()

{:ok, chain}
catch
_ ->
{:error, :network_issue}
end
end

defp get_transaction_chain([], _, _, _), do: {:error, :network_issue}

defp locally_available_nodes(address) do
address
|> Election.chain_storage_nodes(P2P.available_nodes())
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)
end

@doc """
Retrieve a transaction chain based on an address from the closest nodes
by setting `paging_address as an offset address.
"""
@spec get_transaction_chain_by_paging_address(binary(), binary()) ::
{:ok, list(Transaction.t())} | {:error, :network_issue}
def get_transaction_chain_by_paging_address(address, paging_address) when is_binary(address) do
options = [paging_state: paging_address]
local_available_nodes = locally_available_nodes(address)
transaction_chain_by_paging_address(local_available_nodes, address, options)
end
nodes =
address
|> Election.chain_storage_nodes(P2P.available_nodes())
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)

defp transaction_chain_by_paging_address([node | rest], address, options) do
case P2P.send_message(node, %GetTransactionChain{
address: address,
paging_state: Keyword.get(options, :paging_state)
}) do
{:ok, %TransactionList{transactions: transactions}} ->
{:ok, transactions}
try do
chain_page =
address
|> TransactionChain.stream_remotely(nodes, paging_address)
|> Enum.at(0)

{:error, _} ->
transaction_chain_by_paging_address(rest, address, options)
{:ok, chain_page}
catch
_ ->
{:error, :network_issue}
end
end

defp transaction_chain_by_paging_address([], _address, _options) do
{:error, :network_issue}
end

@doc """
Retrieve the number of transaction in a transaction chain from the closest nodes
"""
@spec get_transaction_chain_length(binary()) ::
{:ok, non_neg_integer()} | {:error, :network_issue}
def get_transaction_chain_length(address) when is_binary(address) do
address
|> Election.chain_storage_nodes(P2P.available_nodes())
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)
|> get_transaction_chain_length(address)
end

defp get_transaction_chain_length([node | rest], address) do
case P2P.send_message(node, %GetTransactionChainLength{address: address}) do
{:ok, %TransactionChainLength{length: length}} ->
{:ok, length}
nodes =
address
|> Election.chain_storage_nodes(P2P.available_nodes())
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)

{:error, _} ->
get_transaction_chain_length(rest, address)
end
TransactionChain.fetch_size_remotely(address, nodes)
end

defp get_transaction_chain_length([], _), do: {:error, :network_issue}
end
55 changes: 36 additions & 19 deletions lib/archethic/beacon_chain/slot/validation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ defmodule Archethic.BeaconChain.Slot.Validation do

with :ok <-
ReplicationAttestation.validate(attestation),
:ok <- check_transaction_summary(storage_nodes, address, tx_summary) do
:ok <- check_transaction_summary(storage_nodes, tx_summary) do
true
else
{:error, reason} ->
Expand All @@ -62,39 +62,56 @@ defmodule Archethic.BeaconChain.Slot.Validation do
end
end

defp check_transaction_summary(nodes, address, expected_summary, timeout \\ 500)
defp check_transaction_summary(nodes, expected_summary, timeout \\ 500)

defp check_transaction_summary([], _, _), do: {:error, :network_issue}

defp check_transaction_summary(
[node | rest],
address,
expected_summary,
timeout
nodes,
expected_summary = %TransactionSummary{
address: address,
type: type
},
_timeout
) do
case P2P.send_message(node, %GetTransactionSummary{address: address}, timeout) do
conflict_resolver = fn results ->
case Enum.find(results, &match?(%TransactionSummary{address: ^address, type: ^type}, &1)) do
nil ->
%NotFound{}

tx_summary ->
tx_summary
end
end

case P2P.quorum_read(
nodes,
%GetTransactionSummary{address: address},
conflict_resolver
) do
{:ok, ^expected_summary} ->
:ok

{:ok, recv = %TransactionSummary{}} ->
Logger.debug(
"Transaction summary received is different #{inspect(recv)} - expect #{inspect(expected_summary)}"
Logger.warning(
"Transaction summary received is different #{inspect(recv)} - expect #{inspect(expected_summary)}",
transaction_address: Base.encode16(address),
transaction_type: type
)

{:error, :invalid_summary}

{:ok, %NotFound{}} ->
Logger.debug("Transaction summary was not found at #{Node.endpoint(node)}")
check_transaction_summary(rest, address, expected_summary, timeout)
Logger.warning("Transaction summary was not found",
transaction_address: Base.encode16(address),
transaction_type: type
)

{:error, :timeout} ->
check_transaction_summary(rest, address, expected_summary, trunc(timeout * 1.5))
{:error, :invalid_summary}

{:error, :closed} ->
check_transaction_summary(rest, address, expected_summary, timeout)
{:error, :network_issue} ->
{:error, :network_issue}
end
end

defp check_transaction_summary([], _, _, _), do: {:error, :network_issue}

defp transaction_storage_nodes(address, timestamp) do
authorized_nodes =
case P2P.authorized_nodes(timestamp) do
Expand Down
Loading

0 comments on commit 0629896

Please sign in to comment.