diff --git a/lib/archethic.ex b/lib/archethic.ex index a1dbeba19..6a8c4e062 100644 --- a/lib/archethic.ex +++ b/lib/archethic.ex @@ -13,24 +13,13 @@ defmodule Archethic do alias __MODULE__.P2P alias __MODULE__.P2P.Message.Balance - alias __MODULE__.P2P.Message.Error alias __MODULE__.P2P.Message.GetBalance - alias __MODULE__.P2P.Message.GetLastTransaction - alias __MODULE__.P2P.Message.GetLastTransactionAddress - alias __MODULE__.P2P.Message.GetTransaction - alias __MODULE__.P2P.Message.GetTransactionChain - alias __MODULE__.P2P.Message.GetTransactionChainLength - alias __MODULE__.P2P.Message.GetTransactionInputs - alias __MODULE__.P2P.Message.LastTransactionAddress alias __MODULE__.P2P.Message.NewTransaction - alias __MODULE__.P2P.Message.NotFound alias __MODULE__.P2P.Message.Ok alias __MODULE__.P2P.Message.StartMining - alias __MODULE__.P2P.Message.TransactionChainLength - alias __MODULE__.P2P.Message.TransactionInputList - alias __MODULE__.P2P.Message.TransactionList alias __MODULE__.P2P.Node + alias __MODULE__.TransactionChain alias __MODULE__.TransactionChain.Transaction alias __MODULE__.TransactionChain.TransactionInput @@ -47,30 +36,14 @@ defmodule Archethic do def search_transaction(address) when is_binary(address) do storage_nodes = Election.chain_storage_nodes(address, P2P.available_nodes()) - storage_nodes - |> P2P.nearest_nodes() - |> Enum.filter(&Node.locally_available?/1) - |> get_transaction(address) - end - - defp get_transaction([node | rest], address) do - case P2P.send_message(node, %GetTransaction{address: address}) do - {:ok, tx = %Transaction{}} -> - {:ok, tx} - - {:ok, %NotFound{}} -> - {:error, :transaction_not_exists} - - {:ok, %Error{}} -> - {:error, :transaction_invalid} + nodes = + storage_nodes + |> P2P.nearest_nodes() + |> Enum.filter(&Node.locally_available?/1) - {:error, _} -> - get_transaction(rest, address) - end + TransactionChain.fetch_transaction_remotely(address, nodes) end - defp get_transaction([], _), do: {:error, :network_issue} - @doc """ Send a new transaction in the network to be mined. The current node will act as welcome node """ @@ -138,61 +111,32 @@ defmodule Archethic do | {:error, :transaction_not_exists} | {:error, :transaction_invalid} | {:error, :network_issue} - def get_last_transaction(address) do - address - |> Election.chain_storage_nodes(P2P.available_nodes()) - |> P2P.nearest_nodes() - |> Enum.filter(&Node.locally_available?/1) - |> get_last_transaction(address) - end - - defp get_last_transaction([node | rest], address) do - case P2P.send_message(node, %GetLastTransaction{address: address}) do - {:ok, tx = %Transaction{}} -> - {:ok, tx} - - {:ok, %NotFound{}} -> - {:error, :transaction_not_exists} - - {:ok, %Error{}} -> - {:error, :transaction_invalid} - - {:error, _} -> - get_last_transaction(rest, address) + def get_last_transaction(address) when is_binary(address) do + case get_last_transaction_address(address) do + {:ok, last_address} -> + nodes = + last_address + |> Election.chain_storage_nodes(P2P.available_nodes()) + |> P2P.nearest_nodes() + |> Enum.filter(&Node.locally_available?/1) + + TransactionChain.fetch_transaction_remotely(last_address, nodes) + + {:error, :network_issue} = e -> + e end end - defp get_last_transaction([], _), do: {:error, :network_issue} - @doc """ Retrieve the last transaction address for a chain from the closest nodes """ @spec get_last_transaction_address(address :: binary()) :: {:ok, binary()} | {:error, :network_issue} - def get_last_transaction_address(address) do - address - |> Election.chain_storage_nodes(P2P.available_nodes()) - |> P2P.nearest_nodes() - |> Enum.filter(&Node.locally_available?/1) - |> get_last_transaction_address(address) - end - - defp get_last_transaction_address([node | rest], address) do - case P2P.send_message(node, %GetLastTransactionAddress{ - address: address, - timestamp: DateTime.utc_now() - }) do - {:ok, %LastTransactionAddress{address: last_address}} -> - {:ok, last_address} - - {:error, _} -> - get_last_transaction_address(rest, address) - end + def get_last_transaction_address(address) when is_binary(address) do + TransactionChain.resolve_last_address(address) end - defp get_last_transaction_address([], _), do: {:error, :network_issue} - @doc """ Retrieve the balance from an address from the closest nodes """ @@ -223,66 +167,40 @@ defmodule Archethic do @spec get_transaction_inputs(binary()) :: {:ok, list(TransactionInput.t())} | {:error, :network_issue} def get_transaction_inputs(address) when is_binary(address) do - address - |> Election.chain_storage_nodes(P2P.available_nodes()) - |> P2P.nearest_nodes() - |> Enum.filter(&Node.locally_available?/1) - |> get_transaction_inputs(address) - end - - defp get_transaction_inputs([node | rest], address) do - case P2P.send_message(node, %GetTransactionInputs{address: address}) do - {:ok, %TransactionInputList{inputs: inputs}} -> - {:ok, inputs} + nodes = + address + |> Election.chain_storage_nodes(P2P.available_nodes()) + |> P2P.nearest_nodes() + |> Enum.filter(&Node.locally_available?/1) - {:error, _} -> - get_transaction_inputs(rest, address) - end + TransactionChain.fetch_inputs_remotely(address, nodes, DateTime.utc_now()) end - defp get_transaction_inputs([], _), do: {:error, :network_issue} - @doc """ Retrieve a transaction chain based on an address from the closest nodes """ @spec get_transaction_chain(binary()) :: {:ok, list(Transaction.t())} | {:error, :network_issue} def get_transaction_chain(address) when is_binary(address) do - local_available_nodes = locally_available_nodes(address) - get_transaction_chain(local_available_nodes, address) - end - - defp get_transaction_chain(nodes, address, opts \\ [], acc \\ []) - - defp get_transaction_chain([node | rest], address, opts, acc) do - case P2P.send_message(node, %GetTransactionChain{ - address: address, - paging_state: Keyword.get(opts, :paging_state) - }) do - {:ok, %TransactionList{transactions: transactions, more?: false}} -> - {:ok, Enum.uniq_by(acc ++ transactions, & &1.address)} - - {:ok, %TransactionList{transactions: transactions, more?: true, paging_state: paging_state}} -> - get_transaction_chain( - [node | rest], - address, - [paging_state: paging_state], - Enum.uniq_by(acc ++ transactions, & &1.address) - ) + nodes = + address + |> Election.chain_storage_nodes(P2P.available_nodes()) + |> P2P.nearest_nodes() + |> Enum.filter(&Node.locally_available?/1) - {:error, _} -> - get_transaction_chain(rest, address, opts, acc) + try do + chain = + address + |> TransactionChain.stream_remotely(nodes) + |> Enum.to_list() + |> List.flatten() + + {:ok, chain} + catch + _ -> + {:error, :network_issue} end end - defp get_transaction_chain([], _, _, _), do: {:error, :network_issue} - - defp locally_available_nodes(address) do - address - |> Election.chain_storage_nodes(P2P.available_nodes()) - |> P2P.nearest_nodes() - |> Enum.filter(&Node.locally_available?/1) - end - @doc """ Retrieve a transaction chain based on an address from the closest nodes by setting `paging_address as an offset address. @@ -290,50 +208,37 @@ defmodule Archethic do @spec get_transaction_chain_by_paging_address(binary(), binary()) :: {:ok, list(Transaction.t())} | {:error, :network_issue} def get_transaction_chain_by_paging_address(address, paging_address) when is_binary(address) do - options = [paging_state: paging_address] - local_available_nodes = locally_available_nodes(address) - transaction_chain_by_paging_address(local_available_nodes, address, options) - end + nodes = + address + |> Election.chain_storage_nodes(P2P.available_nodes()) + |> P2P.nearest_nodes() + |> Enum.filter(&Node.locally_available?/1) - defp transaction_chain_by_paging_address([node | rest], address, options) do - case P2P.send_message(node, %GetTransactionChain{ - address: address, - paging_state: Keyword.get(options, :paging_state) - }) do - {:ok, %TransactionList{transactions: transactions}} -> - {:ok, transactions} + try do + chain_page = + address + |> TransactionChain.stream_remotely(nodes, paging_address) + |> Enum.at(0) - {:error, _} -> - transaction_chain_by_paging_address(rest, address, options) + {:ok, chain_page} + catch + _ -> + {:error, :network_issue} end end - defp transaction_chain_by_paging_address([], _address, _options) do - {:error, :network_issue} - end - @doc """ Retrieve the number of transaction in a transaction chain from the closest nodes """ @spec get_transaction_chain_length(binary()) :: {:ok, non_neg_integer()} | {:error, :network_issue} def get_transaction_chain_length(address) when is_binary(address) do - address - |> Election.chain_storage_nodes(P2P.available_nodes()) - |> P2P.nearest_nodes() - |> Enum.filter(&Node.locally_available?/1) - |> get_transaction_chain_length(address) - end - - defp get_transaction_chain_length([node | rest], address) do - case P2P.send_message(node, %GetTransactionChainLength{address: address}) do - {:ok, %TransactionChainLength{length: length}} -> - {:ok, length} + nodes = + address + |> Election.chain_storage_nodes(P2P.available_nodes()) + |> P2P.nearest_nodes() + |> Enum.filter(&Node.locally_available?/1) - {:error, _} -> - get_transaction_chain_length(rest, address) - end + TransactionChain.fetch_size_remotely(address, nodes) end - - defp get_transaction_chain_length([], _), do: {:error, :network_issue} end diff --git a/lib/archethic/beacon_chain/slot/validation.ex b/lib/archethic/beacon_chain/slot/validation.ex index 002d5c6e6..9ee3e6af3 100644 --- a/lib/archethic/beacon_chain/slot/validation.ex +++ b/lib/archethic/beacon_chain/slot/validation.ex @@ -49,7 +49,7 @@ defmodule Archethic.BeaconChain.Slot.Validation do with :ok <- ReplicationAttestation.validate(attestation), - :ok <- check_transaction_summary(storage_nodes, address, tx_summary) do + :ok <- check_transaction_summary(storage_nodes, tx_summary) do true else {:error, reason} -> @@ -62,39 +62,56 @@ defmodule Archethic.BeaconChain.Slot.Validation do end end - defp check_transaction_summary(nodes, address, expected_summary, timeout \\ 500) + defp check_transaction_summary(nodes, expected_summary, timeout \\ 500) + + defp check_transaction_summary([], _, _), do: {:error, :network_issue} defp check_transaction_summary( - [node | rest], - address, - expected_summary, - timeout + nodes, + expected_summary = %TransactionSummary{ + address: address, + type: type + }, + _timeout ) do - case P2P.send_message(node, %GetTransactionSummary{address: address}, timeout) do + conflict_resolver = fn results -> + case Enum.find(results, &match?(%TransactionSummary{address: ^address, type: ^type}, &1)) do + nil -> + %NotFound{} + + tx_summary -> + tx_summary + end + end + + case P2P.quorum_read( + nodes, + %GetTransactionSummary{address: address}, + conflict_resolver + ) do {:ok, ^expected_summary} -> :ok {:ok, recv = %TransactionSummary{}} -> - Logger.debug( - "Transaction summary received is different #{inspect(recv)} - expect #{inspect(expected_summary)}" + Logger.warning( + "Transaction summary received is different #{inspect(recv)} - expect #{inspect(expected_summary)}", + transaction_address: Base.encode16(address), + transaction_type: type ) - {:error, :invalid_summary} - {:ok, %NotFound{}} -> - Logger.debug("Transaction summary was not found at #{Node.endpoint(node)}") - check_transaction_summary(rest, address, expected_summary, timeout) + Logger.warning("Transaction summary was not found", + transaction_address: Base.encode16(address), + transaction_type: type + ) - {:error, :timeout} -> - check_transaction_summary(rest, address, expected_summary, trunc(timeout * 1.5)) + {:error, :invalid_summary} - {:error, :closed} -> - check_transaction_summary(rest, address, expected_summary, timeout) + {:error, :network_issue} -> + {:error, :network_issue} end end - defp check_transaction_summary([], _, _, _), do: {:error, :network_issue} - defp transaction_storage_nodes(address, timestamp) do authorized_nodes = case P2P.authorized_nodes(timestamp) do diff --git a/lib/archethic/mining/transaction_context.ex b/lib/archethic/mining/transaction_context.ex index fe3f6de86..51e5a1e64 100644 --- a/lib/archethic/mining/transaction_context.ex +++ b/lib/archethic/mining/transaction_context.ex @@ -10,13 +10,15 @@ defmodule Archethic.Mining.TransactionContext do alias Archethic.Election alias Archethic.P2P + alias Archethic.P2P.Message.Ok + alias Archethic.P2P.Message.Ping alias Archethic.P2P.Node - alias __MODULE__.DataFetcher alias __MODULE__.NodeDistribution alias Archethic.TaskSupervisor + alias Archethic.TransactionChain alias Archethic.TransactionChain.Transaction alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.UnspentOutput @@ -59,15 +61,14 @@ defmodule Archethic.Mining.TransactionContext do utxo_task = request_utxo(previous_address, unspent_outputs_nodes_split) nodes_view_task = request_nodes_view(node_public_keys) - {prev_tx, prev_tx_node_involved} = await_previous_tx_request(prev_tx_task) - - {:ok, utxos, utxo_node_involved} = Task.await(utxo_task) + prev_tx = Task.await(prev_tx_task) + utxos = Task.await(utxo_task) nodes_view = Task.await(nodes_view_task) - involved_nodes = - [prev_tx_node_involved, utxo_node_involved] - |> Enum.filter(& &1) - |> P2P.distinct_nodes() + # involved_nodes = + # [prev_tx_node_involved, utxo_node_involved] + # |> Enum.filter(& &1) + # |> P2P.distinct_nodes() %{ chain_nodes_view: chain_storage_nodes_view, @@ -81,7 +82,8 @@ defmodule Archethic.Mining.TransactionContext do io_storage_node_public_keys ) - {prev_tx, utxos, involved_nodes, chain_storage_nodes_view, beacon_storage_nodes_view, + # TODO: remove the invovled nodes as not used anymore + {prev_tx, utxos, [], chain_storage_nodes_view, beacon_storage_nodes_view, io_storage_nodes_view} end @@ -98,34 +100,45 @@ defmodule Archethic.Mining.TransactionContext do defp request_previous_tx(previous_address, nodes) do Task.Supervisor.async(TaskSupervisor, fn -> - DataFetcher.fetch_previous_transaction(previous_address, nodes) - end) - end - - defp await_previous_tx_request(task) do - case Task.await(task) do - {:ok, tx, node_involved} -> - {tx, node_involved} - - {:error, :not_found} -> - {nil, nil} + case TransactionChain.fetch_transaction_remotely(previous_address, nodes) do + {:ok, tx} -> + tx - {:error, :invalid_transaction} -> - raise "Invalid previous transaction" - end + {:error, :transaction_not_exists} -> + nil + end + end) end defp request_utxo(previous_address, nodes) do - TaskSupervisor - |> Task.Supervisor.async(fn -> - DataFetcher.fetch_unspent_outputs(previous_address, nodes) + Task.Supervisor.async(TaskSupervisor, fn -> + {:ok, utxos} = + TransactionChain.fetch_unspent_outputs_remotely( + previous_address, + nodes + ) + + utxos end) end defp request_nodes_view(node_public_keys) do - TaskSupervisor - |> Task.Supervisor.async(fn -> - DataFetcher.fetch_p2p_view(node_public_keys) + Task.Supervisor.async(TaskSupervisor, fn -> + Task.Supervisor.async_stream_nolink( + TaskSupervisor, + node_public_keys, + fn node_public_key -> + {node_public_key, P2P.send_message(node_public_key, %Ping{}, 500)} + end, + on_timeout: :kill_task, + timeout: 500 + ) + |> Stream.filter(&match?({:ok, _}, &1)) + |> Enum.map(fn + {:ok, {node_public_key, {:ok, %Ok{}}}} -> {node_public_key, true} + {:ok, {node_public_key, _}} -> {node_public_key, false} + end) + |> Enum.into(%{}) end) end diff --git a/lib/archethic/mining/transaction_context/data_fetcher.ex b/lib/archethic/mining/transaction_context/data_fetcher.ex deleted file mode 100644 index 2dbc077c2..000000000 --- a/lib/archethic/mining/transaction_context/data_fetcher.ex +++ /dev/null @@ -1,91 +0,0 @@ -defmodule Archethic.Mining.TransactionContext.DataFetcher do - @moduledoc false - - alias Archethic.Crypto - - alias Archethic.P2P - alias Archethic.P2P.Message.Error - alias Archethic.P2P.Message.GetTransaction - alias Archethic.P2P.Message.GetUnspentOutputs - alias Archethic.P2P.Message.NotFound - alias Archethic.P2P.Message.Ok - alias Archethic.P2P.Message.Ping - alias Archethic.P2P.Message.UnspentOutputList - alias Archethic.P2P.Node - - alias Archethic.TaskSupervisor - - alias Archethic.TransactionChain.Transaction - alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.UnspentOutput - - require Logger - - @doc """ - Retrieve the previous transaction and the first node which replied - """ - @spec fetch_previous_transaction(binary(), list(Node.t())) :: - {:ok, Transaction.t(), Node.t()} - | {:error, :not_found} - | {:error, :invalid_transaction} - | {:error, :network_issue} - def fetch_previous_transaction(previous_address, [node | rest]) do - message = %GetTransaction{address: previous_address} - - case P2P.send_message(node, message, 500) do - {:ok, tx = %Transaction{}} -> - {:ok, tx, node} - - {:ok, %NotFound{}} -> - {:error, :not_found} - - {:ok, %Error{reason: :invalid_transaction}} -> - {:error, :invalid_transaction} - - {:error, _} -> - fetch_previous_transaction(previous_address, rest) - end - end - - def fetch_previous_transaction(_, []), do: {:error, :network_issue} - - @doc """ - Retrieve the previous unspent outputs and the first node which replied. - """ - @spec fetch_unspent_outputs(address :: binary(), storage_nodes :: list(Node.t())) :: - {:ok, list(UnspentOutput.t()), Node.t()} | {:error, :network_issue} - def fetch_unspent_outputs(previous_address, [node | rest]) do - message = %GetUnspentOutputs{address: previous_address} - - case P2P.send_message(node, message, 500) do - {:ok, %UnspentOutputList{unspent_outputs: utxos}} -> - {:ok, utxos, node} - - {:error, _} -> - fetch_unspent_outputs(previous_address, rest) - end - end - - def fetch_unspent_outputs(_previous_address, []), do: {:error, :network_issue} - - @doc """ - Request to a set a storage nodes the P2P view of some nodes and the first node which replied - """ - @spec fetch_p2p_view(node_public_keys :: list(Crypto.key())) :: %{Crypto.key() => boolean()} - def fetch_p2p_view(node_public_keys) do - Task.Supervisor.async_stream_nolink( - TaskSupervisor, - node_public_keys, - fn node_public_key -> - {node_public_key, P2P.send_message(node_public_key, %Ping{}, 500)} - end, - on_timeout: :kill_task, - timeout: 500 - ) - |> Stream.filter(&match?({:ok, _}, &1)) - |> Enum.map(fn - {:ok, {node_public_key, {:ok, %Ok{}}}} -> {node_public_key, true} - {:ok, {node_public_key, _}} -> {node_public_key, false} - end) - |> Enum.into(%{}) - end -end diff --git a/lib/archethic/p2p.ex b/lib/archethic/p2p.ex index 496c248f9..757af78b4 100644 --- a/lib/archethic/p2p.ex +++ b/lib/archethic/p2p.ex @@ -529,4 +529,83 @@ defmodule Archethic.P2P do :lists.flatten([list | [node]]) end end + + @doc """ + Send a message to a list of nodes and perform a read quorum + """ + @spec quorum_read( + node_list :: list(Node.t()), + message :: Message.t(), + conflict_resolver :: (list(Message.t()) -> Message.t()), + consistency_level :: pos_integer() + ) :: + {:ok, Message.t()} | {:error, :network_issue} + def quorum_read( + nodes, + message, + conflict_resolver \\ fn results -> List.first(results) end, + consistency_level \\ 2 + ) + + def quorum_read([], _, _, _), do: {:error, :network_issue} + + def quorum_read( + [node | rest], + message, + conflict_resolver, + consistency_level + ) do + # We request the first node and + case send_message(node, message) do + {:ok, result} -> + # Then we try to reach performing monotonic quorum read (using the conflict resolver) + do_quorum_read( + rest, + message, + conflict_resolver, + consistency_level, + result + ) + + {:error, _} -> + quorum_read(rest, message, conflict_resolver, consistency_level) + end + end + + defp do_quorum_read([], _message, _conflict_resolver, _consistency_level, prior_result), + do: {:ok, prior_result} + + defp do_quorum_read(nodes, message, conflict_resolver, consistency_level, prior_result) do + # We determine how many nodes to fetch for the quorum from the consistency level + {group, rest} = Enum.split(nodes, consistency_level) + + results = + Task.Supervisor.async_stream_nolink( + TaskSupervisor, + group, + &send_message(&1, message), + ordered: false, + on_timeout: :kill_task + ) + |> Stream.filter(&match?({:ok, {:ok, _}}, &1)) + |> Stream.map(fn {:ok, {:ok, res}} -> res end) + |> Enum.to_list() + + # If no nodes answered we try another group + if Enum.empty?(results) do + do_quorum_read(rest, message, conflict_resolver, consistency_level, prior_result) + else + distinct_elems = Enum.dedup([prior_result | results]) + + # If the results are the same, then we reached consistency + if length(distinct_elems) == 1 do + {:ok, prior_result} + else + # If the results differ, we can apply a conflict resolver to merge the result into + # a consistent response + resolved_result = conflict_resolver.(distinct_elems) + {:ok, resolved_result} + end + end + end end diff --git a/lib/archethic/replication.ex b/lib/archethic/replication.ex index d2bbdb56f..9b5c2b48d 100644 --- a/lib/archethic/replication.ex +++ b/lib/archethic/replication.ex @@ -50,9 +50,6 @@ defmodule Archethic.Replication do It will download the transaction chain and unspents to validate the new transaction and store the new transaction chain and update the internal ledger and views - - Options: - - self_repair?: Determines if the replication is from a self repair cycle. This switch will be determine to fetch unspent outputs or transaction inputs for a chain role validation """ @spec validate_and_store_transaction_chain( validated_tx :: Transaction.t(), @@ -83,20 +80,18 @@ defmodule Archethic.Replication do start_time = System.monotonic_time() - self_repair? = Keyword.get(opts, :self_repair?, false) - Logger.debug("Retrieve chain and unspent outputs...", transaction_address: Base.encode16(address), transaction_type: type ) - {previous_tx, inputs_unspent_outputs} = fetch_context(tx, self_repair?) + {previous_tx, inputs} = fetch_context(tx) # Validate the transaction and check integrity from the previous transaction case TransactionValidator.validate( tx, previous_tx, - Enum.to_list(inputs_unspent_outputs) + Enum.to_list(inputs) ) do :ok -> # Stream the insertion of the chain @@ -203,18 +198,15 @@ defmodule Archethic.Replication do end end - defp fetch_context( - tx = %Transaction{type: type}, - self_repair? - ) do + defp fetch_context(tx = %Transaction{type: type}) do if Transaction.network_type?(type) do - fetch_context_for_network_transaction(tx, self_repair?) + fetch_context_for_network_transaction(tx) else - fetch_context_for_regular_transaction(tx, self_repair?) + fetch_context_for_regular_transaction(tx) end end - defp fetch_context_for_network_transaction(tx = %Transaction{}, self_repair?) do + defp fetch_context_for_network_transaction(tx = %Transaction{}) do previous_address = Transaction.previous_address(tx) Logger.debug( @@ -245,12 +237,12 @@ defmodule Archethic.Replication do transaction_type: tx.type ) - inputs_unspent_outputs = fetch_inputs_unspent_outputs(tx, self_repair?) + inputs = fetch_inputs(tx) - {previous_transaction, inputs_unspent_outputs} + {previous_transaction, inputs} end - defp fetch_context_for_regular_transaction(tx = %Transaction{}, self_repair?) do + defp fetch_context_for_regular_transaction(tx = %Transaction{}) do previous_address = Transaction.previous_address(tx) t1 = @@ -264,26 +256,20 @@ defmodule Archethic.Replication do TransactionContext.fetch_transaction(previous_address) end) - t2 = - Task.Supervisor.async(TaskSupervisor, fn -> - fetch_inputs_unspent_outputs(tx, self_repair?) - end) + t2 = Task.Supervisor.async(TaskSupervisor, fn -> fetch_inputs(tx) end) previous_transaction = Task.await(t1) - inputs_unspent_outputs = Task.await(t2) + inputs = Task.await(t2) Logger.debug("Previous transaction #{inspect(previous_transaction)}", transaction_address: Base.encode16(tx.address), transaction_type: tx.type ) - {previous_transaction, inputs_unspent_outputs} + {previous_transaction, inputs} end - defp fetch_inputs_unspent_outputs( - tx = %Transaction{validation_stamp: %ValidationStamp{timestamp: tx_time}}, - _self_repair? = true - ) do + defp fetch_inputs(tx = %Transaction{validation_stamp: %ValidationStamp{timestamp: tx_time}}) do previous_address = Transaction.previous_address(tx) Logger.debug( @@ -302,26 +288,6 @@ defmodule Archethic.Replication do end) end - defp fetch_inputs_unspent_outputs(tx = %Transaction{}, _self_repair? = false) do - previous_address = Transaction.previous_address(tx) - - Logger.debug( - "Fetch unspent outputs for #{Base.encode16(previous_address)}", - transaction_address: Base.encode16(tx.address), - transaction_type: tx.type - ) - - previous_address - |> TransactionContext.fetch_unspent_outputs() - |> tap(fn utxos -> - Logger.debug( - "Got #{inspect(utxos)} for #{Base.encode16(previous_address)}", - transaction_address: Base.encode16(tx.address), - transaction_type: tx.type - ) - end) - end - defp stream_previous_chain(tx = %Transaction{type: type}) do previous_address = Transaction.previous_address(tx) diff --git a/lib/archethic/replication/transaction_context.ex b/lib/archethic/replication/transaction_context.ex index 4d290a979..1c62aa972 100644 --- a/lib/archethic/replication/transaction_context.ex +++ b/lib/archethic/replication/transaction_context.ex @@ -6,18 +6,10 @@ defmodule Archethic.Replication.TransactionContext do alias Archethic.Election alias Archethic.P2P - alias Archethic.P2P.Message.GetTransaction - alias Archethic.P2P.Message.GetTransactionChain - alias Archethic.P2P.Message.GetTransactionInputs - alias Archethic.P2P.Message.GetUnspentOutputs - alias Archethic.P2P.Message.NotFound - alias Archethic.P2P.Message.TransactionInputList - alias Archethic.P2P.Message.TransactionList - alias Archethic.P2P.Message.UnspentOutputList alias Archethic.P2P.Node + alias Archethic.TransactionChain alias Archethic.TransactionChain.Transaction - alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.UnspentOutput alias Archethic.TransactionChain.TransactionInput require Logger @@ -27,38 +19,17 @@ defmodule Archethic.Replication.TransactionContext do """ @spec fetch_transaction(address :: Crypto.versioned_hash()) :: Transaction.t() | nil def fetch_transaction(address) when is_binary(address) do - case replication_nodes(address) do - [] -> - nil - - nodes -> - fetch_transaction(nodes, address) - end - end + nodes = replication_nodes(address) - defp fetch_transaction( - _nodes = [node | rest], - address - ) do - message = %GetTransaction{ - address: address - } - - case P2P.send_message(node, message) do - {:ok, tx = %Transaction{}} -> + case TransactionChain.fetch_transaction_remotely(address, nodes) do + {:ok, tx} -> tx - {:ok, %NotFound{}} -> + {:error, :transaction_not_exists} -> nil - - {:error, _} -> - fetch_transaction(rest, address) end end - defp fetch_transaction([], _address), - do: raise("Cannot fetch transaction chain") - @doc """ Stream transaction chain """ @@ -70,125 +41,22 @@ defmodule Archethic.Replication.TransactionContext do [] nodes -> - Stream.resource( - fn -> {address, nil, 0} end, - fn - {:end, size} -> - Logger.debug("Size of the chain retrieved: #{size}", - transaction_address: Base.encode16(address) - ) - - {:halt, address} - - {address, paging_state, size} -> - do_stream_chain(nodes, address, paging_state, size) - end, - fn _ -> :ok end - ) - end - end - - defp do_stream_chain(nodes, address, paging_state, size) do - case fetch_transaction_chain(nodes, address, paging_state) do - {transactions, false, _} -> - {[transactions], {:end, size + length(transactions)}} - - {transactions, true, paging_state} -> - {[transactions], {address, paging_state, size + length(transactions)}} + TransactionChain.stream_remotely(address, nodes) end end - defp fetch_transaction_chain( - _nodes = [node | rest], - address, - paging_state - ) do - message = %GetTransactionChain{ - address: address, - paging_state: paging_state - } - - case P2P.send_message(node, message) do - {:ok, - %TransactionList{transactions: transactions, more?: more?, paging_state: paging_state}} -> - {transactions, more?, paging_state} - - {:error, _} -> - fetch_transaction_chain(rest, address, paging_state) - end - end - - defp fetch_transaction_chain([], _address, _paging_state), - do: raise("Cannot fetch transaction chain") - - @doc """ - Fetch the transaction unspent outputs - """ - @spec fetch_unspent_outputs(address :: Crypto.versioned_hash()) :: - list(UnspentOutput.t()) - def fetch_unspent_outputs(address) when is_binary(address) do - case replication_nodes(address) do - [] -> - [] - - nodes -> - do_fetch_unspent_outputs(nodes, address) - end - end - - defp do_fetch_unspent_outputs(nodes, address, prev_result \\ nil) - - defp do_fetch_unspent_outputs([node | rest], address, _prev_result) do - case P2P.send_message(node, %GetUnspentOutputs{address: address}) do - {:ok, %UnspentOutputList{unspent_outputs: []}} -> - do_fetch_unspent_outputs(rest, address, []) - - {:ok, %UnspentOutputList{unspent_outputs: unspent_outputs}} -> - unspent_outputs - - {:error, _} -> - do_fetch_unspent_outputs(rest, address) - end - end - - defp do_fetch_unspent_outputs([], _, nil), do: raise("Cannot fetch unspent outputs") - defp do_fetch_unspent_outputs([], _, prev_result), do: prev_result - @doc """ Fetch the transaction inputs for a transaction address at a given time """ @spec fetch_transaction_inputs(address :: Crypto.versioned_hash(), DateTime.t()) :: list(TransactionInput.t()) def fetch_transaction_inputs(address, timestamp = %DateTime{}) when is_binary(address) do - case replication_nodes(address) do - [] -> - [] - - nodes -> - nodes - |> do_fetch_inputs(address) - |> Enum.filter(&(DateTime.diff(&1.timestamp, timestamp) <= 0)) - end - end - - defp do_fetch_inputs(nodes, address, prev_result \\ nil) - - defp do_fetch_inputs([node | rest], address, _prev_result) do - case P2P.send_message(node, %GetTransactionInputs{address: address}) do - {:ok, %TransactionInputList{inputs: []}} -> - do_fetch_inputs(rest, address, []) + nodes = replication_nodes(address) - {:ok, %TransactionInputList{inputs: inputs}} -> - inputs - - {:error, _} -> - do_fetch_inputs(rest, address) - end + {:ok, inputs} = TransactionChain.fetch_inputs_remotely(address, nodes, timestamp) + inputs end - defp do_fetch_inputs([], _, nil), do: raise("Cannot fetch inputs") - defp do_fetch_inputs([], _, prev_result), do: prev_result - defp replication_nodes(address) do address # returns the storage nodes for the transaction chain based on the transaction address diff --git a/lib/archethic/replication/transaction_validator.ex b/lib/archethic/replication/transaction_validator.ex index 96b9cd62e..1b97af389 100644 --- a/lib/archethic/replication/transaction_validator.ex +++ b/lib/archethic/replication/transaction_validator.ex @@ -20,7 +20,6 @@ defmodule Archethic.Replication.TransactionValidator do alias Archethic.TransactionChain.Transaction.CrossValidationStamp alias Archethic.TransactionChain.Transaction.ValidationStamp alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations - alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.UnspentOutput alias Archethic.TransactionChain.TransactionInput require Logger @@ -50,15 +49,15 @@ defmodule Archethic.Replication.TransactionValidator do @spec validate( validated_transaction :: Transaction.t(), previous_transaction :: Transaction.t() | nil, - inputs_outputs :: list(UnspentOutput.t()) | list(TransactionInput.t()) + inputs_outputs :: list(TransactionInput.t()) ) :: :ok | {:error, error()} def validate( tx = %Transaction{}, previous_transaction, - inputs_outputs + inputs ) do - with :ok <- valid_transaction(tx, inputs_outputs, true), + with :ok <- valid_transaction(tx, inputs, true), :ok <- validate_inheritance(tx, previous_transaction) do validate_chain(tx, previous_transaction) end @@ -91,12 +90,11 @@ defmodule Archethic.Replication.TransactionValidator do @spec validate(Transaction.t()) :: :ok | {:error, error()} def validate(tx = %Transaction{}), do: valid_transaction(tx, [], false) - defp valid_transaction(tx = %Transaction{}, previous_inputs_unspent_outputs, chain_node?) - when is_list(previous_inputs_unspent_outputs) do + defp valid_transaction(tx = %Transaction{}, inputs, chain_node?) when is_list(inputs) do with :ok <- validate_consensus(tx), :ok <- validate_validation_stamp(tx) do if chain_node? do - check_unspent_outputs(tx, previous_inputs_unspent_outputs) + check_inputs(tx, inputs) else :ok end @@ -340,9 +338,9 @@ defmodule Archethic.Replication.TransactionValidator do {:error, {:transaction_errors_detected, errors}} end - defp check_unspent_outputs( + defp check_inputs( tx = %Transaction{type: type, address: address}, - previous_inputs_unspent_outputs + inputs ) do cond do address == Bootstrap.genesis_address() -> @@ -352,24 +350,24 @@ defmodule Archethic.Replication.TransactionValidator do :ok true -> - do_check_unspent_outputs(tx, previous_inputs_unspent_outputs) + do_check_inputs(tx, inputs) end end - defp do_check_unspent_outputs( + defp do_check_inputs( tx = %Transaction{ validation_stamp: %ValidationStamp{ ledger_operations: ops = %LedgerOperations{} } }, - previous_inputs_unspent_outputs + inputs ) do - with :ok <- validate_unspent_outputs(tx, previous_inputs_unspent_outputs) do - validate_funds(ops, previous_inputs_unspent_outputs) + with :ok <- validate_inputs(tx, inputs) do + validate_funds(ops, inputs) end end - defp validate_unspent_outputs( + defp validate_inputs( tx = %Transaction{ validation_stamp: %ValidationStamp{ ledger_operations: %LedgerOperations{ @@ -379,7 +377,7 @@ defmodule Archethic.Replication.TransactionValidator do } } }, - previous_inputs_unspent_outputs + inputs ) do %LedgerOperations{unspent_outputs: expected_unspent_outputs} = %LedgerOperations{ @@ -387,7 +385,7 @@ defmodule Archethic.Replication.TransactionValidator do transaction_movements: transaction_movements } |> LedgerOperations.from_transaction(tx) - |> LedgerOperations.consume_inputs(tx.address, previous_inputs_unspent_outputs) + |> LedgerOperations.consume_inputs(tx.address, inputs) same? = Enum.all?(next_unspent_outputs, fn %{amount: amount, from: from} -> @@ -403,12 +401,12 @@ defmodule Archethic.Replication.TransactionValidator do transaction_type: tx.type ) - {:error, :invalid_unspent_outputs} + {:error, :invalid_inputs} end end - defp validate_funds(ops = %LedgerOperations{}, previous_inputs_unspent_outputs) do - if LedgerOperations.sufficient_funds?(ops, previous_inputs_unspent_outputs) do + defp validate_funds(ops = %LedgerOperations{}, inputs) do + if LedgerOperations.sufficient_funds?(ops, inputs) do :ok else {:error, :insufficient_funds} diff --git a/lib/archethic/reward.ex b/lib/archethic/reward.ex index 43972439c..b73f3f40f 100644 --- a/lib/archethic/reward.ex +++ b/lib/archethic/reward.ex @@ -66,7 +66,7 @@ defmodule Archethic.Reward do end defp get_transactions_after(address, date) do - last_address = TransactionChain.resolve_last_address(address, DateTime.utc_now()) + {:ok, last_address} = TransactionChain.resolve_last_address(address, DateTime.utc_now()) last_address |> Election.chain_storage_nodes(P2P.available_nodes()) diff --git a/lib/archethic/self_repair/sync/beacon_summary_handler.ex b/lib/archethic/self_repair/sync/beacon_summary_handler.ex index daf4c6243..25c9a6dda 100644 --- a/lib/archethic/self_repair/sync/beacon_summary_handler.ex +++ b/lib/archethic/self_repair/sync/beacon_summary_handler.ex @@ -34,28 +34,31 @@ defmodule Archethic.SelfRepair.Sync.BeaconSummaryHandler do def get_full_beacon_summary(summary_time, subset, nodes) do summary_address = Crypto.derive_beacon_chain_address(subset, summary_time, true) - Task.Supervisor.async_stream_nolink( - TaskSupervisor, - Enum.filter(nodes, &Node.locally_available?/1), - fn node -> - P2P.send_message(node, %GetBeaconSummary{address: summary_address}) - end, - on_timeout: :kill_task, - ordered: false - ) - |> Enum.filter(&match?({:ok, {:ok, %BeaconSummary{}}}, &1)) - |> Enum.map(fn {:ok, {:ok, summary}} -> summary end) - |> Enum.reject(&BeaconSummary.empty?/1) - |> Enum.reduce( - %{ + conflict_resolver = fn results -> + acc = %{ transaction_attestations: [], node_availabilities: [], - node_average_availabilities: [], - end_of_node_synchronizations: [] - }, - &do_reduce_beacon_summaries/2 - ) - |> aggregate(summary_time, subset) + node_average_availabilities: [] + } + + results + |> Enum.filter(&match?(%BeaconSummary{}, &1)) + |> Enum.reduce(acc, &do_reduce_beacon_summaries/2) + |> aggregate_summary(summary_time, subset) + end + + case P2P.quorum_read( + nodes, + %GetBeaconSummary{address: summary_address}, + conflict_resolver, + length(nodes) + ) do + {:ok, summary = %BeaconSummary{}} -> + summary + + _ -> + %BeaconSummary{summary_time: summary_time, subset: subset} + end end defp do_reduce_beacon_summaries( @@ -90,7 +93,7 @@ defmodule Archethic.SelfRepair.Sync.BeaconSummaryHandler do end end - defp aggregate( + defp aggregate_summary( %{ transaction_attestations: transaction_attestations, node_availabilities: node_availabilities, @@ -148,26 +151,29 @@ defmodule Archethic.SelfRepair.Sync.BeaconSummaryHandler do def download_summary(_beacon_address, [], _), do: {:ok, %NotFound{}} def download_summary(beacon_address, nodes, patch) do - nodes - |> P2P.nearest_nodes(patch) - |> do_get_download_summary(beacon_address, nil) - end - - defp do_get_download_summary([node | rest], address, prev_result) do - case P2P.send_message(node, %GetBeaconSummary{address: address}) do - {:ok, summary = %BeaconSummary{}} -> - {:ok, summary} + conflict_resolver = fn results -> + acc = %{ + transaction_attestations: [], + node_availabilities: [], + node_average_availabilities: [] + } - {:ok, %NotFound{}} -> - do_get_download_summary(rest, address, %NotFound{}) + %{transaction_attestations: transaction_attestations} = + results + |> Enum.filter(&match?(%BeaconSummary{}, &1)) + |> Enum.reduce(acc, &do_reduce_beacon_summaries/2) - {:error, _} -> - do_get_download_summary(rest, address, prev_result) + %BeaconSummary{transaction_attestations: transaction_attestations} end - end - defp do_get_download_summary([], _, %NotFound{}), do: {:ok, %NotFound{}} - defp do_get_download_summary([], _, _), do: {:error, :network_issue} + nodes + |> P2P.nearest_nodes(patch) + |> P2P.quorum_read( + %GetBeaconSummary{address: beacon_address}, + conflict_resolver, + length(nodes) + ) + end @doc """ Process beacon summary to synchronize the transactions involving. diff --git a/lib/archethic/self_repair/sync/beacon_summary_handler/transaction_handler.ex b/lib/archethic/self_repair/sync/beacon_summary_handler/transaction_handler.ex index 2af5ad50a..a4e6296a8 100644 --- a/lib/archethic/self_repair/sync/beacon_summary_handler/transaction_handler.ex +++ b/lib/archethic/self_repair/sync/beacon_summary_handler/transaction_handler.ex @@ -6,11 +6,11 @@ defmodule Archethic.SelfRepair.Sync.BeaconSummaryHandler.TransactionHandler do alias Archethic.Election alias Archethic.P2P - alias Archethic.P2P.Message.GetTransaction alias Archethic.P2P.Node alias Archethic.Replication + alias Archethic.TransactionChain alias Archethic.TransactionChain.Transaction alias Archethic.TransactionChain.TransactionSummary @@ -67,34 +67,28 @@ defmodule Archethic.SelfRepair.Sync.BeaconSummaryHandler.TransactionHandler do |> P2P.nearest_nodes() |> Enum.filter(&Node.locally_available?/1) - case fetch_transaction(storage_nodes, address) do + case TransactionChain.fetch_transaction_remotely(address, storage_nodes) do {:ok, tx = %Transaction{}} -> tx - {:error, :network_issue} -> + {:error, :transaction_not_exists} -> Logger.error("Cannot fetch the transaction to sync", transaction_address: Base.encode16(address), transaction_type: type ) - raise "Network issue during during self repair" - end - end + raise "Transaction doesn't exist" - defp fetch_transaction([node | rest], address) do - case P2P.send_message(node, %GetTransaction{address: address}) do - {:ok, tx = %Transaction{}} -> - {:ok, tx} + {:error, :network_issue} -> + Logger.error("Cannot fetch the transaction to sync", + transaction_address: Base.encode16(address), + transaction_type: type + ) - _ -> - fetch_transaction(rest, address) + raise "Network issue during during self repair" end end - defp fetch_transaction([], _) do - {:error, :network_issue} - end - @spec process_transaction(Transaction.t()) :: :ok | {:error, :invalid_transaction} def process_transaction( tx = %Transaction{ diff --git a/lib/archethic/transaction_chain.ex b/lib/archethic/transaction_chain.ex index 0a51bf8e8..c6ef6d060 100644 --- a/lib/archethic/transaction_chain.ex +++ b/lib/archethic/transaction_chain.ex @@ -10,8 +10,19 @@ defmodule Archethic.TransactionChain do alias Archethic.Election alias Archethic.P2P + alias Archethic.P2P.Message.Error + alias Archethic.P2P.Message.GetTransaction + alias Archethic.P2P.Message.GetTransactionChain + alias Archethic.P2P.Message.GetTransactionChainLength alias Archethic.P2P.Message.GetLastTransactionAddress + alias Archethic.P2P.Message.GetTransactionInputs + alias Archethic.P2P.Message.GetUnspentOutputs alias Archethic.P2P.Message.LastTransactionAddress + alias Archethic.P2P.Message.NotFound + alias Archethic.P2P.Message.TransactionChainLength + alias Archethic.P2P.Message.TransactionList + alias Archethic.P2P.Message.TransactionInputList + alias Archethic.P2P.Message.UnspentOutputList alias Archethic.P2P.Node alias __MODULE__.MemTables.KOLedger @@ -23,7 +34,9 @@ defmodule Archethic.TransactionChain do alias __MODULE__.Transaction alias __MODULE__.TransactionData alias __MODULE__.Transaction.ValidationStamp + alias __MODULE__.Transaction.ValidationStamp.LedgerOperations.UnspentOutput alias __MODULE__.TransactionSummary + alias __MODULE__.TransactionInput require Logger @@ -478,7 +491,13 @@ defmodule Archethic.TransactionChain do TaskSupervisor, addresses, fn to -> - {to, resolve_last_address(to, time)} + case resolve_last_address(to, time) do + {:ok, resolved} -> + {to, resolved} + + _ -> + {to, to} + end end, on_timeout: :kill_task ) @@ -489,29 +508,44 @@ defmodule Archethic.TransactionChain do @doc """ Retrieve the last address of a chain """ - @spec resolve_last_address(binary(), DateTime.t()) :: binary() - def resolve_last_address(address, timestamp = %DateTime{}) when is_binary(address) do - address - |> Election.chain_storage_nodes(P2P.available_nodes()) - |> P2P.nearest_nodes() - |> Enum.filter(&Node.locally_available?/1) - |> get_last_transaction_address(address, timestamp) + @spec resolve_last_address(binary(), DateTime.t()) :: {:ok, binary()} | {:error, :network_issue} + def resolve_last_address(address, timestamp = %DateTime{} \\ DateTime.utc_now()) + when is_binary(address) do + nodes = + address + |> Election.chain_storage_nodes(P2P.available_nodes()) + |> P2P.nearest_nodes() + |> Enum.filter(&Node.locally_available?/1) + + case fetch_last_address_remotely(address, nodes, timestamp) do + {:ok, last_address} -> + {:ok, last_address} + + {:error, _} = e -> + e + end end - defp get_last_transaction_address([node | rest], address, timestamp) do - message = %GetLastTransactionAddress{address: address, timestamp: timestamp} - - case P2P.send_message(node, message) do - {:ok, %LastTransactionAddress{address: address}} -> - address - - {:error, _} -> - get_last_transaction_address(rest, address, timestamp) + @doc """ + Fetch the last address remotely + """ + @spec fetch_last_address_remotely(binary(), list(Node.t()), DateTime.t()) :: + {:ok, binary()} | {:error, :network_issue} + def fetch_last_address_remotely(address, nodes, timestamp = %DateTime{} \\ DateTime.utc_now()) + when is_binary(address) and is_list(nodes) do + # TODO: implement conflict resolver to get the latest address + case P2P.quorum_read( + nodes, + %GetLastTransactionAddress{address: address, timestamp: timestamp} + ) do + {:ok, %LastTransactionAddress{address: last_address}} -> + {:ok, last_address} + + {:error, :network_issue} = e -> + e end end - defp get_last_transaction_address([], address, _), do: address - @doc """ Get a transaction summary from a transaction address """ @@ -553,4 +587,208 @@ defmodule Archethic.TransactionChain do fn _ -> :ok end ) end + + @doc """ + Fetch transaction remotely + + If the transaction exists, then its value is returned in the shape of `{:ok, transaction}`. + If the transaction doesn't exist, `{:error, :transaction_not_exists}` is returned. + + If no nodes are available to answer the request, `{:error, :network_issue}` is returned. + """ + @spec fetch_transaction_remotely(address :: Crypto.versioned_hash(), list(Node.t())) :: + {:ok, Transaction.t()} + | {:error, :transaction_not_exists} + | {:error, :transaction_invalid} + | {:error, :network_issue} + def fetch_transaction_remotely(_, []), do: {:error, :transaction_not_exists} + + def fetch_transaction_remotely(address, nodes) when is_binary(address) and is_list(nodes) do + conflict_resolver = fn results -> + # Prioritize transactions results over not found + with nil <- Enum.find(results, &match?(%Transaction{}, &1)), + nil <- Enum.find(results, &match?(%Error{}, &1)) do + %NotFound{} + else + res -> + res + end + end + + case P2P.quorum_read( + nodes, + %GetTransaction{address: address}, + conflict_resolver + ) do + {:ok, %NotFound{}} -> + {:error, :transaction_not_exists} + + {:ok, %Error{}} -> + {:error, :transaction_invalid} + + {:ok, tx = %Transaction{}} -> + {:ok, tx} + + {:error, :network_issue} -> + {:error, :network_issue} + end + end + + @doc """ + Stream transaction chain remotely + """ + @spec stream_remotely( + address :: Crypto.versioned_hash(), + list(Node.t()), + paging_state :: nil | binary() + ) :: + Enumerable.t() | list(Transaction.t()) + def stream_remotely(address, nodes, paging_state \\ nil) + def stream_remotely(_, [], _), do: [] + + def stream_remotely(address, nodes, paging_state) + when is_binary(address) and is_list(nodes) do + Stream.resource( + fn -> {address, paging_state, 0} end, + fn + {:end, _size} -> + {:halt, address} + + {address, paging_state, size} -> + do_stream_chain(nodes, address, paging_state, size) + end, + fn _ -> :ok end + ) + end + + defp do_stream_chain(nodes, address, paging_state, size) do + case fetch_transaction_chain(nodes, address, paging_state) do + {transactions, false, _} -> + {[transactions], {:end, size + length(transactions)}} + + {transactions, true, paging_state} -> + {[transactions], {address, paging_state, size + length(transactions)}} + end + end + + defp fetch_transaction_chain( + nodes, + address, + paging_state + ) do + conflict_resolver = fn results -> + results + |> Enum.sort( + &((&1.more? and !&2.more?) or length(&1.transactions) > length(&2.transactions)) + ) + |> List.first() + end + + case P2P.quorum_read( + nodes, + %GetTransactionChain{address: address, paging_state: paging_state}, + conflict_resolver + ) do + {:ok, + %TransactionList{transactions: transactions, more?: more?, paging_state: paging_state}} -> + {transactions, more?, paging_state} + + {:error, :network_issue} -> + raise "Cannot fetch transaction chain" + end + end + + @doc """ + Fetch the transaction inputs for a transaction address at a given time + + If the inputs exist, then they are returned in the shape of `{:ok, inputs}`. + If no nodes are able to answer the request, `{:error, :network_issue}` is returned. + """ + @spec fetch_inputs_remotely(address :: Crypto.versioned_hash(), list(Node.t()), DateTime.t()) :: + {:ok, list(TransactionInput.t())} | {:error, :network_issue} + def fetch_inputs_remotely(_, [], _), do: {:ok, []} + + def fetch_inputs_remotely(address, nodes, timestamp = %DateTime{}) + when is_binary(address) and is_list(nodes) do + conflict_resolver = fn results -> + results + |> Enum.sort_by(&length(&1.inputs), :desc) + |> List.first() + end + + case P2P.quorum_read( + nodes, + %GetTransactionInputs{address: address}, + conflict_resolver + ) do + {:ok, %TransactionInputList{inputs: inputs}} -> + filtered_inputs = Enum.filter(inputs, &(DateTime.diff(&1.timestamp, timestamp) <= 0)) + {:ok, filtered_inputs} + + {:error, :network_issue} -> + {:error, :network_issue} + end + end + + @doc """ + Fetch the transaction unspent outputs for a transaction address at a given time + + If the utxo exist, then they are returned in the shape of `{:ok, inputs}`. + If no nodes are able to answer the request, `{:error, :network_issue}` is returned. + """ + @spec fetch_unspent_outputs_remotely( + address :: Crypto.versioned_hash(), + list(Node.t()) + ) :: + {:ok, list(UnspentOutput.t())} | {:error, :network_issue} + def fetch_unspent_outputs_remotely(_, []), do: {:ok, []} + + def fetch_unspent_outputs_remotely(address, nodes) + when is_binary(address) and is_list(nodes) do + conflict_resolver = fn results -> + results + |> Enum.sort_by(&length(&1.unspent_outputs), :desc) + |> List.first() + end + + case P2P.quorum_read( + nodes, + %GetUnspentOutputs{address: address}, + conflict_resolver + ) do + {:ok, %UnspentOutputList{unspent_outputs: unspent_outputs}} -> + {:ok, unspent_outputs} + + {:error, :network_issue} -> + {:error, :network_issue} + end + end + + @doc """ + Fetch the transaction chain length for a transaction address + + The result is returned in the shape of `{:ok, length}`. + If no nodes are able to answer the request, `{:error, :network_issue}` is returned. + """ + @spec fetch_size_remotely(Crypto.versioned_hash(), list(Node.t())) :: + {:ok, non_neg_integer()} | {:error, :network_issue} + def fetch_size_remotely(_, []), do: {:ok, 0} + + def fetch_size_remotely(address, nodes) do + conflict_resolver = fn results -> + Enum.max_by(results, & &1.length) + end + + case P2P.quorum_read( + nodes, + %GetTransactionChainLength{address: address}, + conflict_resolver + ) do + {:ok, %TransactionChainLength{length: length}} -> + {:ok, length} + + {:error, :network_issue} -> + {:error, :network_issue} + end + end end diff --git a/lib/archethic_web/controllers/explorer_controller.ex b/lib/archethic_web/controllers/explorer_controller.ex index 15d4f6e03..e90eccede 100644 --- a/lib/archethic_web/controllers/explorer_controller.ex +++ b/lib/archethic_web/controllers/explorer_controller.ex @@ -32,7 +32,7 @@ defmodule ArchethicWeb.ExplorerController do {:ok, %{uco: uco_balance}} <- Archethic.get_balance(addr), uco_price <- DateTime.utc_now() |> OracleChain.get_uco_price() do render(conn, "chain.html", - transaction_chain: chain, + transaction_chain: List.flatten(chain), chain_size: Enum.count(chain), address: addr, uco_balance: uco_balance, @@ -92,7 +92,7 @@ defmodule ArchethicWeb.ExplorerController do {:ok, %{uco: uco_balance}} <- Archethic.get_balance(addr), uco_price <- DateTime.utc_now() |> OracleChain.get_uco_price() do render(conn, "chain.html", - transaction_chain: chain, + transaction_chain: List.flatten(chain), address: addr, chain_size: Enum.count(chain), uco_balance: uco_balance, diff --git a/lib/archethic_web/live/chains/beacon_live.ex b/lib/archethic_web/live/chains/beacon_live.ex index f24c7b406..0d8782329 100644 --- a/lib/archethic_web/live/chains/beacon_live.ex +++ b/lib/archethic_web/live/chains/beacon_live.ex @@ -13,17 +13,13 @@ defmodule ArchethicWeb.BeaconChainLive do alias Archethic.Election alias Archethic.P2P - alias Archethic.P2P.Message.GetLastTransactionAddress - alias Archethic.P2P.Message.GetTransactionChain - alias Archethic.P2P.Message.LastTransactionAddress - alias Archethic.P2P.Message.TransactionList - alias Archethic.P2P.Node alias Archethic.PubSub alias Archethic.SelfRepair.Sync.BeaconSummaryHandler + alias Archethic.TransactionChain alias Archethic.TransactionChain.Transaction alias Archethic.TransactionChain.TransactionData @@ -288,13 +284,15 @@ defmodule ArchethicWeb.BeaconChainLive do |> Flow.partition(key: {:elem, 2}) |> Flow.reduce(fn -> [] end, fn {address, nodes, _subset}, acc -> transactions = - with {:ok, last_address} <- get_last_beacon_address(nodes, address), - {:ok, transactions} <- get_beacon_chain(nodes, last_address) do - transactions - |> Stream.filter(&(&1.type == :beacon)) - |> Stream.map(&deserialize_beacon_transaction/1) - |> Enum.to_list() - else + case TransactionChain.fetch_last_address_remotely(address, nodes) do + {:ok, last_address} -> + last_address + |> TransactionChain.stream_remotely(nodes) + |> Stream.flat_map(& &1) + |> Stream.filter(&(&1.type == :beacon)) + |> Stream.map(&deserialize_beacon_transaction/1) + |> Enum.to_list() + {:error, :network_issue} -> [] end @@ -307,53 +305,6 @@ defmodule ArchethicWeb.BeaconChainLive do |> Enum.sort_by(& &1.timestamp, {:desc, DateTime}) end - defp get_last_beacon_address([node | rest], address) do - case P2P.send_message(node, %GetLastTransactionAddress{ - address: address, - timestamp: DateTime.utc_now() - }) do - {:ok, %LastTransactionAddress{address: last_address}} -> - {:ok, last_address} - - {:error, _} -> - get_last_beacon_address(rest, address) - end - end - - defp get_last_beacon_address([], _), do: {:error, :network_issue} - - defp get_beacon_chain(nodes, address, opts \\ [], acc \\ []) - - defp get_beacon_chain(nodes = [node | rest], address, opts, acc) do - message = %GetTransactionChain{ - address: address, - paging_state: Keyword.get(opts, :paging_state) - } - - case P2P.send_message(node, message) do - {:ok, %TransactionList{transactions: transactions, more?: false}} -> - {:ok, Enum.uniq_by(acc ++ transactions, & &1.address)} - - {:ok, %TransactionList{transactions: transactions, more?: true, paging_state: paging_state}} -> - get_beacon_chain( - nodes, - address, - [paging_state: paging_state], - Enum.uniq_by(acc ++ transactions, & &1.address) - ) - - {:error, _} -> - get_beacon_chain( - rest, - address, - opts, - acc - ) - end - end - - defp get_beacon_chain([], _, _, _), do: {:error, :network_issue} - defp deserialize_beacon_transaction(%Transaction{ type: :beacon, data: %TransactionData{content: content} diff --git a/test/archethic/bootstrap/network_init_test.exs b/test/archethic/bootstrap/network_init_test.exs index 6c746770e..e8d98633f 100644 --- a/test/archethic/bootstrap/network_init_test.exs +++ b/test/archethic/bootstrap/network_init_test.exs @@ -17,11 +17,11 @@ defmodule Archethic.Bootstrap.NetworkInitTest do alias Archethic.P2P.Message.GetLastTransactionAddress alias Archethic.P2P.Message.GetTransaction alias Archethic.P2P.Message.GetTransactionChain - alias Archethic.P2P.Message.GetUnspentOutputs + alias Archethic.P2P.Message.GetTransactionInputs alias Archethic.P2P.Message.NotFound alias Archethic.P2P.Message.LastTransactionAddress alias Archethic.P2P.Message.TransactionList - alias Archethic.P2P.Message.UnspentOutputList + alias Archethic.P2P.Message.TransactionInputList alias Archethic.P2P.Node alias Archethic.SharedSecrets @@ -39,6 +39,7 @@ defmodule Archethic.Bootstrap.NetworkInitTest do alias Archethic.TransactionChain.TransactionData.Ledger alias Archethic.TransactionChain.TransactionData.UCOLedger alias Archethic.TransactionChain.TransactionData.UCOLedger.Transfer + alias Archethic.TransactionChain.TransactionInput alias Archethic.TransactionChain.TransactionSummary alias Archethic.TransactionFactory @@ -129,7 +130,12 @@ defmodule Archethic.Bootstrap.NetworkInitTest do test "self_replication/1 should insert the transaction and add to the beacon chain" do inputs = [ - %UnspentOutput{amount: 499_999_000_000, from: "genesis", type: :UCO} + %TransactionInput{ + amount: 499_999_000_000, + from: "genesis", + type: :UCO, + timestamp: DateTime.utc_now() + } ] MockClient @@ -140,8 +146,8 @@ defmodule Archethic.Bootstrap.NetworkInitTest do _, %GetTransactionChain{}, _ -> {:ok, %TransactionList{transactions: []}} - _, %GetUnspentOutputs{}, _ -> - {:ok, %UnspentOutputList{unspent_outputs: inputs}} + _, %GetTransactionInputs{}, _ -> + {:ok, %TransactionInputList{inputs: inputs}} end) Crypto.generate_deterministic_keypair("daily_nonce_seed") @@ -193,8 +199,8 @@ defmodule Archethic.Bootstrap.NetworkInitTest do _, %GetTransactionChain{}, _ -> {:ok, %TransactionList{transactions: [], more?: false, paging_state: nil}} - _, %GetUnspentOutputs{}, _ -> - {:ok, %UnspentOutputList{unspent_outputs: []}} + _, %GetTransactionInputs{}, _ -> + {:ok, %TransactionInputList{inputs: []}} end) me = self() @@ -241,8 +247,8 @@ defmodule Archethic.Bootstrap.NetworkInitTest do _, %GetTransactionChain{}, _ -> {:ok, %TransactionList{transactions: [], more?: false, paging_state: nil}} - _, %GetUnspentOutputs{}, _ -> - {:ok, %UnspentOutputList{unspent_outputs: []}} + _, %GetTransactionInputs{}, _ -> + {:ok, %TransactionInputList{inputs: []}} _, %GetLastTransactionAddress{address: address}, _ -> {:ok, %LastTransactionAddress{address: address}} @@ -288,8 +294,8 @@ defmodule Archethic.Bootstrap.NetworkInitTest do _, %GetTransactionChain{}, _ -> {:ok, %TransactionList{transactions: [], more?: false, paging_state: nil}} - _, %GetUnspentOutputs{}, _ -> - {:ok, %UnspentOutputList{unspent_outputs: []}} + _, %GetTransactionInputs{}, _ -> + {:ok, %TransactionInputList{inputs: []}} _, %GetLastTransactionAddress{address: address}, _ -> {:ok, %LastTransactionAddress{address: address}} @@ -332,7 +338,7 @@ defmodule Archethic.Bootstrap.NetworkInitTest do } }} - {origin_public_key, rest} = Archethic.Utils.deserialize_public_key(content) + {origin_public_key, _rest} = Archethic.Utils.deserialize_public_key(content) assert origin_public_key in @genesis_origin_public_keys end end diff --git a/test/archethic/bootstrap/sync_test.exs b/test/archethic/bootstrap/sync_test.exs index 1d47854fe..04e6a4ade 100644 --- a/test/archethic/bootstrap/sync_test.exs +++ b/test/archethic/bootstrap/sync_test.exs @@ -15,7 +15,7 @@ defmodule Archethic.Bootstrap.SyncTest do alias Archethic.P2P.Message.GetStorageNonce alias Archethic.P2P.Message.GetTransaction alias Archethic.P2P.Message.GetTransactionChain - alias Archethic.P2P.Message.GetUnspentOutputs + alias Archethic.P2P.Message.GetTransactionInputs alias Archethic.P2P.Message.NotFound alias Archethic.P2P.Message.LastTransactionAddress alias Archethic.P2P.Message.ListNodes @@ -23,7 +23,7 @@ defmodule Archethic.Bootstrap.SyncTest do alias Archethic.P2P.Message.NotifyEndOfNodeSync alias Archethic.P2P.Message.Ok alias Archethic.P2P.Message.TransactionList - alias Archethic.P2P.Message.UnspentOutputList + alias Archethic.P2P.Message.TransactionInputList alias Archethic.P2P.Node alias Archethic.SharedSecrets @@ -51,8 +51,8 @@ defmodule Archethic.Bootstrap.SyncTest do _, %GetTransaction{}, _ -> {:ok, %NotFound{}} - _, %GetUnspentOutputs{}, _ -> - {:ok, %UnspentOutputList{unspent_outputs: []}} + _, %GetTransactionInputs{}, _ -> + {:ok, %TransactionInputList{inputs: []}} _, %GetTransactionChain{}, _ -> {:ok, %TransactionList{transactions: []}} diff --git a/test/archethic/bootstrap_test.exs b/test/archethic/bootstrap_test.exs index 23f1c658a..6193ba72c 100644 --- a/test/archethic/bootstrap_test.exs +++ b/test/archethic/bootstrap_test.exs @@ -18,8 +18,7 @@ defmodule Archethic.BootstrapTest do alias Archethic.P2P.Message.GetTransaction alias Archethic.P2P.Message.GetTransactionChain alias Archethic.P2P.Message.GetTransactionSummary - alias Archethic.P2P.Message.GetUnspentOutputs - alias Archethic.P2P.Message.GetUnspentOutputs + alias Archethic.P2P.Message.GetTransactionInputs alias Archethic.P2P.Message.LastTransactionAddress alias Archethic.P2P.Message.ListNodes alias Archethic.P2P.Message.NewTransaction @@ -27,7 +26,7 @@ defmodule Archethic.BootstrapTest do alias Archethic.P2P.Message.NotFound alias Archethic.P2P.Message.NotifyEndOfNodeSync alias Archethic.P2P.Message.TransactionList - alias Archethic.P2P.Message.UnspentOutputList + alias Archethic.P2P.Message.TransactionInputList alias Archethic.P2P.Message.Ok alias Archethic.P2P.Node @@ -65,8 +64,8 @@ defmodule Archethic.BootstrapTest do _, %GetLastTransactionAddress{address: address}, _ -> {:ok, %LastTransactionAddress{address: address}} - _, %GetUnspentOutputs{}, _ -> - {:ok, %UnspentOutputList{unspent_outputs: []}} + _, %GetTransactionInputs{}, _ -> + {:ok, %TransactionInputList{inputs: []}} _, %GetTransactionChain{}, _ -> {:ok, %TransactionList{transactions: []}} diff --git a/test/archethic/mining/transaction_context/data_fetcher_test.exs b/test/archethic/mining/transaction_context/data_fetcher_test.exs deleted file mode 100644 index 2eb4c997a..000000000 --- a/test/archethic/mining/transaction_context/data_fetcher_test.exs +++ /dev/null @@ -1,124 +0,0 @@ -defmodule Archethic.Mining.TransactionContext.DataFetcherTest do - use ArchethicCase - - alias Archethic.Crypto - - alias Archethic.P2P - alias Archethic.P2P.Message.GetTransaction - alias Archethic.P2P.Message.GetUnspentOutputs - alias Archethic.P2P.Message.NotFound - alias Archethic.P2P.Message.Ok - alias Archethic.P2P.Message.Ping - alias Archethic.P2P.Message.UnspentOutputList - alias Archethic.P2P.Node - - alias Archethic.Mining.TransactionContext.DataFetcher - - alias Archethic.TransactionChain.Transaction - alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.UnspentOutput - - import Mox - - setup do - P2P.add_and_connect_node(%Node{ - first_public_key: Crypto.last_node_public_key(), - available?: false, - network_patch: "AAA" - }) - - :ok - end - - describe "fetch_previous_transaction/2" do - test "should return the previous transaction and node involve if exists" do - stub(MockClient, :send_message, fn _, %GetTransaction{}, _ -> - {:ok, %Transaction{}} - end) - - node = %Node{ - ip: {127, 0, 0, 1}, - port: 3000, - first_public_key: "key1", - last_public_key: "key2", - network_patch: "AAA", - authorized?: true, - authorization_date: DateTime.utc_now() - } - - P2P.add_and_connect_node(node) - - assert {:ok, %Transaction{}, %Node{ip: {127, 0, 0, 1}}} = - DataFetcher.fetch_previous_transaction("@Alice2", [node]) - end - - test "should return nil and node node involved if not exists" do - stub(MockClient, :send_message, fn _, %GetTransaction{}, _ -> - {:ok, %NotFound{}} - end) - - node = %Node{ - ip: {127, 0, 0, 1}, - port: 3000, - first_public_key: "key1", - last_public_key: "key2", - network_patch: "AAA", - authorized?: true, - authorization_date: DateTime.utc_now() - } - - P2P.add_and_connect_node(node) - - assert {:error, :not_found} = DataFetcher.fetch_previous_transaction("@Alice2", [node]) - end - end - - test "should return the unspent outputs and nodes involved if exists" do - MockClient - |> stub(:send_message, fn - _, %GetUnspentOutputs{}, _ -> - {:ok, - %UnspentOutputList{ - unspent_outputs: [%UnspentOutput{from: "@Bob3", amount: 10, type: :UCO}] - }} - end) - - node1 = %Node{ - last_public_key: "key1", - first_public_key: "key1", - ip: {127, 0, 0, 1}, - port: 3002, - available?: true, - geo_patch: "AAA", - network_patch: "AAA", - authorized?: true, - authorization_date: DateTime.utc_now() - } - - P2P.add_and_connect_node(node1) - - {:ok, [%UnspentOutput{from: "@Bob3", amount: 10, type: :UCO}], %Node{last_public_key: "key1"}} = - DataFetcher.fetch_unspent_outputs("@Alice2", [node1]) - end - - describe "fetch_p2p_view/2" do - test "should retrieve the P2P view for a list of node public keys" do - stub(MockClient, :send_message, fn _, %Ping{}, _ -> - {:ok, %Ok{}} - end) - - node = %Node{ - ip: {127, 0, 0, 1}, - port: 3000, - first_public_key: "key1", - last_public_key: "key2", - network_patch: "AAA", - authorized?: true, - authorization_date: DateTime.utc_now() - } - - P2P.add_and_connect_node(node) - - assert %{"key2" => true, "key3" => false} = DataFetcher.fetch_p2p_view(["key2", "key3"]) - end - end -end diff --git a/test/archethic/mining/transaction_context_test.exs b/test/archethic/mining/transaction_context_test.exs index 05ad9ca22..4570dded5 100644 --- a/test/archethic/mining/transaction_context_test.exs +++ b/test/archethic/mining/transaction_context_test.exs @@ -44,7 +44,11 @@ defmodule Archethic.Mining.TransactionContextTest do {:ok, %UnspentOutputList{ unspent_outputs: [ - %UnspentOutput{from: "@Bob3", amount: 1_000_000_000, type: :UCO} + %UnspentOutput{ + from: "@Bob3", + amount: 1_000_000_000, + type: :UCO + } ] }} end) @@ -89,16 +93,12 @@ defmodule Archethic.Mining.TransactionContextTest do P2P.add_and_connect_node(node2) P2P.add_and_connect_node(node3) - assert {%Transaction{}, [%UnspentOutput{}], involved_nodes, <<1::1, 1::1>>, <<1::1, 1::1>>, + assert {%Transaction{}, [%UnspentOutput{}], _, <<1::1, 1::1>>, <<1::1, 1::1>>, <<1::1, 1::1>>} = TransactionContext.get("@Alice1", ["key1", "key2"], ["key1", "key2"], [ "key2", "key1" ]) - - assert involved_nodes - |> Enum.map(& &1.first_public_key) - |> Enum.all?(&(&1 in ["key1", "key2", "key3"])) end end end diff --git a/test/archethic/p2p_test.exs b/test/archethic/p2p_test.exs index 738c85a98..04b3b5374 100644 --- a/test/archethic/p2p_test.exs +++ b/test/archethic/p2p_test.exs @@ -4,8 +4,12 @@ defmodule Archethic.P2PTest do alias Archethic.Crypto alias Archethic.P2P + alias Archethic.P2P.Message.GetTransaction + alias Archethic.P2P.Message.NotFound alias Archethic.P2P.Node + alias Archethic.TransactionChain.Transaction + doctest Archethic.P2P import Mox @@ -25,4 +29,91 @@ defmodule Archethic.P2PTest do assert %Node{ip: {127, 0, 0, 1}} = P2P.get_node_info() end + + describe "quorum_read/4" do + setup do + pub1 = Crypto.generate_deterministic_keypair("node1") |> elem(0) + pub2 = Crypto.generate_deterministic_keypair("node2") |> elem(0) + pub3 = Crypto.generate_deterministic_keypair("node3") |> elem(0) + pub4 = Crypto.generate_deterministic_keypair("node4") |> elem(0) + pub5 = Crypto.generate_deterministic_keypair("node5") |> elem(0) + + nodes = [ + %Node{ + ip: {127, 0, 0, 1}, + port: 3002, + first_public_key: pub1, + last_public_key: pub1 + }, + %Node{ + ip: {127, 0, 0, 1}, + port: 3003, + first_public_key: pub2, + last_public_key: pub2 + }, + %Node{ + ip: {127, 0, 0, 1}, + port: 3004, + first_public_key: pub3, + last_public_key: pub3 + }, + %Node{ + ip: {127, 0, 0, 1}, + port: 3005, + first_public_key: pub4, + last_public_key: pub4 + }, + %Node{ + ip: {127, 0, 0, 1}, + port: 3006, + first_public_key: pub5, + last_public_key: pub5 + } + ] + + Enum.each(nodes, &P2P.add_and_connect_node/1) + {:ok, %{nodes: nodes}} + end + + test "should return the first result when the same results are returned", %{nodes: nodes} do + MockClient + |> expect( + :send_message, + 3, + fn _node, %GetTransaction{}, _timeout -> + {:ok, %Transaction{}} + end + ) + + assert {:ok, %Transaction{}} = P2P.quorum_read(nodes, %GetTransaction{address: ""}) + end + + test "should run resolver conflicts when the results are different", %{nodes: nodes} do + MockClient + |> stub(:send_message, fn + %Node{port: 3004}, %GetTransaction{}, _timeout -> + {:ok, %Transaction{}} + + %Node{port: 3003}, %GetTransaction{}, _timeout -> + {:ok, %NotFound{}} + + %Node{port: 3002}, %GetTransaction{}, _timeout -> + {:ok, %NotFound{}} + + _, %GetTransaction{}, _timeout -> + {:ok, %Transaction{}} + end) + + assert {:ok, %Transaction{}} = + P2P.quorum_read(nodes, %GetTransaction{address: ""}, fn results -> + case Enum.find(results, &match?(%Transaction{}, &1)) do + nil -> + %NotFound{} + + tx -> + tx + end + end) + end + end end diff --git a/test/archethic/replication/transaction_context_test.exs b/test/archethic/replication/transaction_context_test.exs index e67b2457b..e4f753c23 100644 --- a/test/archethic/replication/transaction_context_test.exs +++ b/test/archethic/replication/transaction_context_test.exs @@ -8,15 +8,16 @@ defmodule Archethic.Replication.TransactionContextTest do alias Archethic.P2P alias Archethic.P2P.Message.GetTransaction alias Archethic.P2P.Message.GetTransactionChain - alias Archethic.P2P.Message.GetUnspentOutputs + alias Archethic.P2P.Message.GetTransactionInputs + alias Archethic.P2P.Message.TransactionInputList alias Archethic.P2P.Message.TransactionList - alias Archethic.P2P.Message.UnspentOutputList alias Archethic.P2P.Node alias Archethic.Replication.TransactionContext alias Archethic.TransactionChain.Transaction alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.UnspentOutput + alias Archethic.TransactionChain.TransactionInput import Mox @@ -64,44 +65,6 @@ defmodule Archethic.Replication.TransactionContextTest do |> Enum.count() end - test "fetch_unspent_outputs/1 should retrieve the previous unspent outputs" do - UCOLedger.add_unspent_output( - "@Alice1", - %UnspentOutput{ - from: "@Bob3", - amount: 19_300_000, - type: :UCO - }, - ~U[2021-03-05 13:41:34Z] - ) - - MockClient - |> stub(:send_message, fn _, %GetUnspentOutputs{}, _ -> - {:ok, - %UnspentOutputList{ - unspent_outputs: [ - %UnspentOutput{from: "@Bob3", amount: 19_300_000, type: :UCO} - ] - }} - end) - - P2P.add_and_connect_node(%Node{ - ip: {127, 0, 0, 1}, - port: 3000, - first_public_key: Crypto.last_node_public_key(), - last_public_key: Crypto.last_node_public_key(), - available?: true, - geo_patch: "AAA", - network_patch: "AAA", - authorized?: true, - authorization_date: DateTime.utc_now() - }) - - assert [%UnspentOutput{from: "@Bob3", amount: 19_300_000, type: :UCO}] = - TransactionContext.fetch_unspent_outputs("@Alice1") - |> Enum.to_list() - end - test "fetch_transaction_inputs/2 should retrieve the inputs of a transaction at a given date" do UCOLedger.add_unspent_output( "@Alice1", @@ -114,11 +77,16 @@ defmodule Archethic.Replication.TransactionContextTest do ) MockClient - |> stub(:send_message, fn _, %GetUnspentOutputs{}, _ -> + |> stub(:send_message, fn _, %GetTransactionInputs{}, _ -> {:ok, - %UnspentOutputList{ - unspent_outputs: [ - %UnspentOutput{from: "@Bob3", amount: 19_300_000, type: :UCO} + %TransactionInputList{ + inputs: [ + %TransactionInput{ + from: "@Bob3", + amount: 19_300_000, + type: :UCO, + timestamp: DateTime.utc_now() + } ] }} end) @@ -135,8 +103,7 @@ defmodule Archethic.Replication.TransactionContextTest do authorization_date: DateTime.utc_now() }) - assert [%UnspentOutput{from: "@Bob3", amount: 19_300_000, type: :UCO}] = - TransactionContext.fetch_unspent_outputs("@Alice1") - |> Enum.to_list() + assert [%TransactionInput{from: "@Bob3", amount: 19_300_000, type: :UCO}] = + TransactionContext.fetch_transaction_inputs("@Alice1", DateTime.utc_now()) end end diff --git a/test/archethic/replication_test.exs b/test/archethic/replication_test.exs index 203dd8921..f17bdb911 100644 --- a/test/archethic/replication_test.exs +++ b/test/archethic/replication_test.exs @@ -10,12 +10,12 @@ defmodule Archethic.ReplicationTest do alias Archethic.P2P alias Archethic.P2P.Message.GetTransaction alias Archethic.P2P.Message.GetTransactionChain - alias Archethic.P2P.Message.GetUnspentOutputs + alias Archethic.P2P.Message.GetTransactionInputs alias Archethic.P2P.Message.NotifyLastTransactionAddress alias Archethic.P2P.Message.NotFound alias Archethic.P2P.Message.Ok + alias Archethic.P2P.Message.TransactionInputList alias Archethic.P2P.Message.TransactionList - alias Archethic.P2P.Message.UnspentOutputList alias Archethic.P2P.Node alias Archethic.Replication @@ -30,6 +30,7 @@ defmodule Archethic.ReplicationTest do alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.UnspentOutput alias Archethic.TransactionChain.TransactionData + alias Archethic.TransactionChain.TransactionInput doctest Archethic.Replication @@ -72,10 +73,19 @@ defmodule Archethic.ReplicationTest do MockClient |> stub(:send_message, fn - _, %GetUnspentOutputs{}, _ -> + _, %GetTransactionInputs{}, _ -> {:ok, - %UnspentOutputList{ - unspent_outputs: unspent_outputs + %TransactionInputList{ + inputs: + Enum.map(unspent_outputs, fn utxo -> + %TransactionInput{ + from: utxo.from, + amount: utxo.amount, + type: utxo.type, + timestamp: + DateTime.utc_now() |> DateTime.add(-30) |> DateTime.truncate(:millisecond) + } + end) }} _, %GetTransactionChain{}, _ -> diff --git a/test/archethic/self_repair/sync/beacon_summary_handler_test.exs b/test/archethic/self_repair/sync/beacon_summary_handler_test.exs index 8c2bba16b..adbcdc0e0 100644 --- a/test/archethic/self_repair/sync/beacon_summary_handler_test.exs +++ b/test/archethic/self_repair/sync/beacon_summary_handler_test.exs @@ -240,7 +240,7 @@ defmodule Archethic.SelfRepair.Sync.BeaconSummaryHandlerTest do %BeaconSummary{transaction_attestations: transaction_attestations} = BeaconSummaryHandler.get_full_beacon_summary(summary_time, "A", [node1, node2]) - transaction_addresses = Enum.map(transaction_attestations, & &1.address) + transaction_addresses = Enum.map(transaction_attestations, & &1.transaction_summary.address) assert Enum.all?(transaction_addresses, &(&1 in [addr1, addr2])) end diff --git a/test/archethic/transaction_chain_test.exs b/test/archethic/transaction_chain_test.exs index e027b210e..df97a05e9 100644 --- a/test/archethic/transaction_chain_test.exs +++ b/test/archethic/transaction_chain_test.exs @@ -4,14 +4,26 @@ defmodule Archethic.TransactionChainTest do alias Archethic.Crypto alias Archethic.P2P + alias Archethic.P2P.Message.GetTransaction + alias Archethic.P2P.Message.GetTransactionChain + alias Archethic.P2P.Message.GetTransactionChainLength alias Archethic.P2P.Message.GetLastTransactionAddress + alias Archethic.P2P.Message.GetTransactionInputs + alias Archethic.P2P.Message.GetUnspentOutputs + alias Archethic.P2P.Message.NotFound alias Archethic.P2P.Message.LastTransactionAddress + alias Archethic.P2P.Message.TransactionChainLength + alias Archethic.P2P.Message.TransactionList + alias Archethic.P2P.Message.TransactionInputList + alias Archethic.P2P.Message.UnspentOutputList alias Archethic.P2P.Node alias Archethic.TransactionChain alias Archethic.TransactionChain.Transaction alias Archethic.TransactionChain.Transaction.ValidationStamp + alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.UnspentOutput alias Archethic.TransactionChain.TransactionData + alias Archethic.TransactionChain.TransactionInput doctest TransactionChain @@ -39,7 +51,460 @@ defmodule Archethic.TransactionChainTest do authorization_date: DateTime.utc_now() }) - assert "@Alice1" = TransactionChain.resolve_last_address("@Alice1", ~U[2021-03-25 15:11:29Z]) - assert "@Alice2" = TransactionChain.resolve_last_address("@Alice1", ~U[2021-03-25 15:12:29Z]) + assert {:ok, "@Alice1"} = + TransactionChain.resolve_last_address("@Alice1", ~U[2021-03-25 15:11:29Z]) + + assert {:ok, "@Alice2"} = + TransactionChain.resolve_last_address("@Alice1", ~U[2021-03-25 15:12:29Z]) + end + + describe "fetch_transaction_remotely/2" do + test "should get the transaction" do + nodes = [ + %Node{ + first_public_key: "node1", + last_public_key: "node1", + ip: {127, 0, 0, 1}, + port: 3000 + }, + %Node{ + first_public_key: "node2", + last_public_key: "node2", + ip: {127, 0, 0, 1}, + port: 3001 + }, + %Node{ + first_public_key: "node3", + last_public_key: "node3", + ip: {127, 0, 0, 1}, + port: 3002 + } + ] + + Enum.each(nodes, &P2P.add_and_connect_node/1) + + MockClient + |> stub(:send_message, fn _, %GetTransaction{address: _}, _ -> + {:ok, %Transaction{}} + end) + + assert {:ok, %Transaction{}} = TransactionChain.fetch_transaction_remotely("Alice1", nodes) + end + + test "should resolve and get tx if one tx is returned" do + nodes = [ + %Node{ + first_public_key: "node1", + last_public_key: "node1", + ip: {127, 0, 0, 1}, + port: 3000 + }, + %Node{ + first_public_key: "node2", + last_public_key: "node2", + ip: {127, 0, 0, 1}, + port: 3001 + }, + %Node{ + first_public_key: "node3", + last_public_key: "node3", + ip: {127, 0, 0, 1}, + port: 3002 + } + ] + + Enum.each(nodes, &P2P.add_and_connect_node/1) + + MockClient + |> stub(:send_message, fn + %Node{port: 3000}, %GetTransaction{address: _}, _ -> + {:ok, %NotFound{}} + + %Node{port: 3001}, %GetTransaction{address: _}, _ -> + {:ok, %Transaction{}} + + %Node{port: 3002}, %GetTransaction{address: _}, _ -> + {:ok, %NotFound{}} + end) + + assert {:ok, %Transaction{}} = TransactionChain.fetch_transaction_remotely("Alice1", nodes) + end + end + + describe "stream_remotely/2" do + test "should get the transaction chain" do + nodes = [ + %Node{ + first_public_key: "node1", + last_public_key: "node1", + ip: {127, 0, 0, 1}, + port: 3000 + }, + %Node{ + first_public_key: "node2", + last_public_key: "node2", + ip: {127, 0, 0, 1}, + port: 3001 + }, + %Node{ + first_public_key: "node3", + last_public_key: "node3", + ip: {127, 0, 0, 1}, + port: 3002 + } + ] + + Enum.each(nodes, &P2P.add_and_connect_node/1) + + MockClient + |> stub(:send_message, fn _, %GetTransactionChain{address: _}, _ -> + {:ok, + %TransactionList{ + transactions: [ + %Transaction{} + ] + }} + end) + + assert 1 = + TransactionChain.stream_remotely("Alice1", nodes) + |> Enum.to_list() + |> List.first() + |> length() + end + + test "should resolve the longest chain" do + nodes = [ + %Node{ + first_public_key: "node1", + last_public_key: "node1", + ip: {127, 0, 0, 1}, + port: 3000 + }, + %Node{ + first_public_key: "node2", + last_public_key: "node2", + ip: {127, 0, 0, 1}, + port: 3001 + }, + %Node{ + first_public_key: "node3", + last_public_key: "node3", + ip: {127, 0, 0, 1}, + port: 3002 + } + ] + + Enum.each(nodes, &P2P.add_and_connect_node/1) + + MockClient + |> stub(:send_message, fn + %Node{port: 3000}, %GetTransactionChain{address: _}, _ -> + {:ok, %TransactionList{}} + + %Node{port: 3001}, %GetTransactionChain{address: _}, _ -> + {:ok, + %TransactionList{ + transactions: [ + %Transaction{}, + %Transaction{}, + %Transaction{}, + %Transaction{} + ], + more?: false + }} + + %Node{port: 3002}, %GetTransactionChain{address: _}, _ -> + {:ok, + %TransactionList{ + transactions: [ + %Transaction{}, + %Transaction{}, + %Transaction{}, + %Transaction{}, + %Transaction{} + ], + more?: false + }} + end) + + assert 5 = + TransactionChain.stream_remotely("Alice1", nodes) + |> Enum.to_list() + |> List.first() + |> length() + end + end + + describe "fetch_inputs_remotely/2" do + test "should get the inputs" do + nodes = [ + %Node{ + first_public_key: "node1", + last_public_key: "node1", + ip: {127, 0, 0, 1}, + port: 3000 + }, + %Node{ + first_public_key: "node2", + last_public_key: "node2", + ip: {127, 0, 0, 1}, + port: 3001 + }, + %Node{ + first_public_key: "node3", + last_public_key: "node3", + ip: {127, 0, 0, 1}, + port: 3002 + } + ] + + Enum.each(nodes, &P2P.add_and_connect_node/1) + + MockClient + |> stub(:send_message, fn _, %GetTransactionInputs{address: _}, _ -> + {:ok, + %TransactionInputList{ + inputs: [ + %TransactionInput{ + from: "Alice2", + amount: 10, + type: :UCO, + spent?: false, + timestamp: DateTime.utc_now() + } + ] + }} + end) + + assert {:ok, [%TransactionInput{from: "Alice2", amount: 10, type: :UCO}]} = + TransactionChain.fetch_inputs_remotely("Alice1", nodes, DateTime.utc_now()) + end + + test "should resolve the longest inputs when conflicts" do + nodes = [ + %Node{ + first_public_key: "node1", + last_public_key: "node1", + ip: {127, 0, 0, 1}, + port: 3000 + }, + %Node{ + first_public_key: "node2", + last_public_key: "node2", + ip: {127, 0, 0, 1}, + port: 3001 + }, + %Node{ + first_public_key: "node3", + last_public_key: "node3", + ip: {127, 0, 0, 1}, + port: 3002 + } + ] + + Enum.each(nodes, &P2P.add_and_connect_node/1) + + MockClient + |> stub(:send_message, fn + %Node{port: 3000}, %GetTransactionInputs{address: _}, _ -> + {:ok, %TransactionInputList{inputs: []}} + + %Node{port: 3001}, %GetTransactionInputs{address: _}, _ -> + {:ok, + %TransactionInputList{ + inputs: [ + %TransactionInput{ + from: "Alice2", + amount: 10, + type: :UCO, + timestamp: DateTime.utc_now() + } + ] + }} + + %Node{port: 3002}, %GetTransactionInputs{address: _}, _ -> + {:ok, + %TransactionInputList{ + inputs: [ + %TransactionInput{ + from: "Alice2", + amount: 10, + type: :UCO, + timestamp: DateTime.utc_now() + }, + %TransactionInput{ + from: "Bob3", + amount: 2, + type: :UCO, + timestamp: DateTime.utc_now() + } + ] + }} + end) + + assert {:ok, [%TransactionInput{from: "Alice2"}, %TransactionInput{from: "Bob3"}]} = + TransactionChain.fetch_inputs_remotely("Alice1", nodes, DateTime.utc_now()) + end + end + + describe "fetch_unspent_outputs_remotely/2" do + test "should get the utxos" do + nodes = [ + %Node{ + first_public_key: "node1", + last_public_key: "node1", + ip: {127, 0, 0, 1}, + port: 3000 + }, + %Node{ + first_public_key: "node2", + last_public_key: "node2", + ip: {127, 0, 0, 1}, + port: 3001 + }, + %Node{ + first_public_key: "node3", + last_public_key: "node3", + ip: {127, 0, 0, 1}, + port: 3002 + } + ] + + Enum.each(nodes, &P2P.add_and_connect_node/1) + + MockClient + |> stub(:send_message, fn _, %GetUnspentOutputs{address: _}, _ -> + {:ok, + %UnspentOutputList{ + unspent_outputs: [%UnspentOutput{from: "Alice2", amount: 10, type: :UCO}] + }} + end) + + assert {:ok, [%UnspentOutput{from: "Alice2", amount: 10, type: :UCO}]} = + TransactionChain.fetch_unspent_outputs_remotely("Alice1", nodes) + end + + test "should resolve the longest utxos when conflicts" do + nodes = [ + %Node{ + first_public_key: "node1", + last_public_key: "node1", + ip: {127, 0, 0, 1}, + port: 3000 + }, + %Node{ + first_public_key: "node2", + last_public_key: "node2", + ip: {127, 0, 0, 1}, + port: 3001 + }, + %Node{ + first_public_key: "node3", + last_public_key: "node3", + ip: {127, 0, 0, 1}, + port: 3002 + } + ] + + Enum.each(nodes, &P2P.add_and_connect_node/1) + + MockClient + |> stub(:send_message, fn + %Node{port: 3000}, %GetUnspentOutputs{address: _}, _ -> + {:ok, %UnspentOutputList{unspent_outputs: []}} + + %Node{port: 3001}, %GetUnspentOutputs{address: _}, _ -> + {:ok, + %UnspentOutputList{ + unspent_outputs: [%UnspentOutput{from: "Alice2", amount: 10, type: :UCO}] + }} + + %Node{port: 3002}, %GetUnspentOutputs{address: _}, _ -> + {:ok, + %UnspentOutputList{ + unspent_outputs: [ + %UnspentOutput{from: "Alice2", amount: 10, type: :UCO}, + %UnspentOutput{from: "Bob3", amount: 2, type: :UCO} + ] + }} + end) + + assert {:ok, [%UnspentOutput{from: "Alice2"}, %UnspentOutput{from: "Bob3"}]} = + TransactionChain.fetch_unspent_outputs_remotely("Alice1", nodes) + end + end + + describe "fetch_size_remotely/2" do + test "should get the transaction chain length" do + nodes = [ + %Node{ + first_public_key: "node1", + last_public_key: "node1", + ip: {127, 0, 0, 1}, + port: 3000 + }, + %Node{ + first_public_key: "node2", + last_public_key: "node2", + ip: {127, 0, 0, 1}, + port: 3001 + }, + %Node{ + first_public_key: "node3", + last_public_key: "node3", + ip: {127, 0, 0, 1}, + port: 3002 + } + ] + + Enum.each(nodes, &P2P.add_and_connect_node/1) + + MockClient + |> stub(:send_message, fn _, %GetTransactionChainLength{address: _}, _ -> + {:ok, %TransactionChainLength{length: 1}} + end) + + assert {:ok, 1} = TransactionChain.fetch_size_remotely("Alice1", nodes) + end + + test "should resolve the longest transaction chain when conflicts" do + nodes = [ + %Node{ + first_public_key: "node1", + last_public_key: "node1", + ip: {127, 0, 0, 1}, + port: 3000 + }, + %Node{ + first_public_key: "node2", + last_public_key: "node2", + ip: {127, 0, 0, 1}, + port: 3001 + }, + %Node{ + first_public_key: "node3", + last_public_key: "node3", + ip: {127, 0, 0, 1}, + port: 3002 + } + ] + + Enum.each(nodes, &P2P.add_and_connect_node/1) + + MockClient + |> stub(:send_message, fn + %Node{port: 3000}, %GetTransactionChainLength{address: _}, _ -> + {:ok, %TransactionChainLength{length: 1}} + + %Node{port: 3001}, %GetTransactionChainLength{address: _}, _ -> + {:ok, %TransactionChainLength{length: 2}} + + %Node{port: 3002}, %GetTransactionChainLength{address: _}, _ -> + {:ok, %TransactionChainLength{length: 1}} + end) + + assert {:ok, 2} = TransactionChain.fetch_size_remotely("Alice1", nodes) + end end end diff --git a/test/archethic_test.exs b/test/archethic_test.exs index 5cbbd507e..6c75b09ae 100644 --- a/test/archethic_test.exs +++ b/test/archethic_test.exs @@ -10,11 +10,12 @@ defmodule ArchethicTest do alias Archethic.P2P alias Archethic.P2P.Message.Balance alias Archethic.P2P.Message.GetBalance - alias Archethic.P2P.Message.GetLastTransaction + alias Archethic.P2P.Message.GetLastTransactionAddress alias Archethic.P2P.Message.GetTransaction alias Archethic.P2P.Message.GetTransactionChain alias Archethic.P2P.Message.GetTransactionChainLength alias Archethic.P2P.Message.GetTransactionInputs + alias Archethic.P2P.Message.LastTransactionAddress alias Archethic.P2P.Message.NotFound alias Archethic.P2P.Message.Ok alias Archethic.P2P.Message.StartMining @@ -144,8 +145,12 @@ defmodule ArchethicTest do }) MockClient - |> expect(:send_message, fn _, %GetLastTransaction{}, _ -> - {:ok, %Transaction{previous_public_key: "Alice1"}} + |> stub(:send_message, fn + _, %GetLastTransactionAddress{address: address}, _ -> + {:ok, %LastTransactionAddress{address: address}} + + _, %GetTransaction{}, _ -> + {:ok, %Transaction{previous_public_key: "Alice1"}} end) assert {:ok, %Transaction{previous_public_key: "Alice1"}} = @@ -177,8 +182,12 @@ defmodule ArchethicTest do }) MockClient - |> expect(:send_message, fn _, %GetLastTransaction{}, _ -> - {:ok, %NotFound{}} + |> stub(:send_message, fn + _, %GetLastTransactionAddress{address: address}, _ -> + {:ok, %LastTransactionAddress{address: address}} + + _, %GetTransaction{}, _ -> + {:ok, %NotFound{}} end) assert {:error, :transaction_not_exists} = diff --git a/test/archethic_web/controllers/api/origin_key_controller_test.exs b/test/archethic_web/controllers/api/origin_key_controller_test.exs index cc518718d..6df5174ce 100644 --- a/test/archethic_web/controllers/api/origin_key_controller_test.exs +++ b/test/archethic_web/controllers/api/origin_key_controller_test.exs @@ -2,18 +2,10 @@ defmodule ArchethicWeb.API.OriginKeyControllerTest do use ArchethicCase use ArchethicWeb.ConnCase - alias Archethic.P2P - - alias Archethic.P2P.Message.{ - LastTransactionAddress, - GetLastTransactionAddress, - GetTransactionChainLength, - TransactionChainLength - } + alias Archethic.Crypto + alias Archethic.P2P alias Archethic.P2P.Node - alias Archethic.TransactionChain - alias Archethic.Crypto import Mox diff --git a/test/archethic_web/controllers/api/web_hosting_controller_test.exs b/test/archethic_web/controllers/api/web_hosting_controller_test.exs index 0db725271..504471a48 100644 --- a/test/archethic_web/controllers/api/web_hosting_controller_test.exs +++ b/test/archethic_web/controllers/api/web_hosting_controller_test.exs @@ -7,7 +7,9 @@ defmodule ArchethicWeb.API.WebHostingControllerTest do alias Archethic.Crypto - alias Archethic.P2P.Message.GetLastTransaction + alias Archethic.P2P.Message.GetLastTransactionAddress + alias Archethic.P2P.Message.GetTransaction + alias Archethic.P2P.Message.LastTransactionAddress alias Archethic.TransactionChain.Transaction alias Archethic.TransactionChain.TransactionData @@ -33,8 +35,12 @@ defmodule ArchethicWeb.API.WebHostingControllerTest do describe "web_hosting/2" do test "should return Invalid address", %{conn: conn} do MockClient - |> stub(:send_message, fn _, %GetLastTransaction{}, _ -> - {:error, :transaction_not_exists} + |> stub(:send_message, fn + _, %GetLastTransactionAddress{address: address}, _ -> + {:ok, %LastTransactionAddress{address: address}} + + _, %GetTransaction{}, _ -> + {:error, :transaction_not_exists} end) conn1 = get(conn, "/api/web_hosting/AZERTY") @@ -53,14 +59,18 @@ defmodule ArchethicWeb.API.WebHostingControllerTest do test "should return Invalid transaction content", %{conn: conn} do MockClient - |> stub(:send_message, fn _, %GetLastTransaction{}, _ -> - {:ok, - %Transaction{ - address: - <<0, 0, 34, 84, 150, 163, 128, 213, 0, 92, 182, 131, 116, 233, 184, 180, 93, 126, 15, - 80, 90, 66, 248, 205, 97, 203, 212, 60, 54, 132, 197, 203, 172, 186>>, - data: %TransactionData{content: "invalid"} - }} + |> stub(:send_message, fn + _, %GetLastTransactionAddress{address: address}, _ -> + {:ok, %LastTransactionAddress{address: address}} + + _, %GetTransaction{}, _ -> + {:ok, + %Transaction{ + address: + <<0, 0, 34, 84, 150, 163, 128, 213, 0, 92, 182, 131, 116, 233, 184, 180, 93, 126, + 15, 80, 90, 66, 248, 205, 97, 203, 212, 60, 54, 132, 197, 203, 172, 186>>, + data: %TransactionData{content: "invalid"} + }} end) conn = @@ -91,14 +101,18 @@ defmodule ArchethicWeb.API.WebHostingControllerTest do """ MockClient - |> stub(:send_message, fn _, %GetLastTransaction{}, _ -> - {:ok, - %Transaction{ - address: - <<0, 0, 34, 84, 150, 163, 128, 213, 0, 92, 182, 131, 116, 233, 184, 180, 93, 126, 15, - 80, 90, 66, 248, 205, 97, 203, 212, 60, 54, 132, 197, 203, 172, 186>>, - data: %TransactionData{content: content} - }} + |> stub(:send_message, fn + _, %GetLastTransactionAddress{address: address}, _ -> + {:ok, %LastTransactionAddress{address: address}} + + _, %GetTransaction{}, _ -> + {:ok, + %Transaction{ + address: + <<0, 0, 34, 84, 150, 163, 128, 213, 0, 92, 182, 131, 116, 233, 184, 180, 93, 126, + 15, 80, 90, 66, 248, 205, 97, 203, 212, 60, 54, 132, 197, 203, 172, 186>>, + data: %TransactionData{content: content} + }} end) :ok @@ -164,14 +178,18 @@ defmodule ArchethicWeb.API.WebHostingControllerTest do """ MockClient - |> stub(:send_message, fn _, %GetLastTransaction{}, _ -> - {:ok, - %Transaction{ - address: - <<0, 0, 34, 84, 150, 163, 128, 213, 0, 92, 182, 131, 116, 233, 184, 180, 93, 126, 15, - 80, 90, 66, 248, 205, 97, 203, 212, 60, 54, 132, 197, 203, 172, 186>>, - data: %TransactionData{content: content} - }} + |> stub(:send_message, fn + _, %GetLastTransactionAddress{address: address}, _ -> + {:ok, %LastTransactionAddress{address: address}} + + _, %GetTransaction{}, _ -> + {:ok, + %Transaction{ + address: + <<0, 0, 34, 84, 150, 163, 128, 213, 0, 92, 182, 131, 116, 233, 184, 180, 93, 126, + 15, 80, 90, 66, 248, 205, 97, 203, 212, 60, 54, 132, 197, 203, 172, 186>>, + data: %TransactionData{content: content} + }} end) :ok @@ -270,14 +288,18 @@ defmodule ArchethicWeb.API.WebHostingControllerTest do """ MockClient - |> stub(:send_message, fn _, %GetLastTransaction{}, _ -> - {:ok, - %Transaction{ - address: - <<0, 0, 34, 84, 150, 163, 128, 213, 0, 92, 182, 131, 116, 233, 184, 180, 93, 126, 15, - 80, 90, 66, 248, 205, 97, 203, 212, 60, 54, 132, 197, 203, 172, 186>>, - data: %TransactionData{content: content} - }} + |> stub(:send_message, fn + _, %GetLastTransactionAddress{address: address}, _ -> + {:ok, %LastTransactionAddress{address: address}} + + _, %GetTransaction{}, _ -> + {:ok, + %Transaction{ + address: + <<0, 0, 34, 84, 150, 163, 128, 213, 0, 92, 182, 131, 116, 233, 184, 180, 93, 126, + 15, 80, 90, 66, 248, 205, 97, 203, 212, 60, 54, 132, 197, 203, 172, 186>>, + data: %TransactionData{content: content} + }} end) conn1 = diff --git a/test/archethic_web/graphql_schema_test.exs b/test/archethic_web/graphql_schema_test.exs index e9d3126e6..35b1db34e 100644 --- a/test/archethic_web/graphql_schema_test.exs +++ b/test/archethic_web/graphql_schema_test.exs @@ -10,10 +10,11 @@ defmodule ArchethicWeb.GraphQLSchemaTest do alias Archethic.P2P alias Archethic.P2P.Message.Balance alias Archethic.P2P.Message.GetBalance - alias Archethic.P2P.Message.GetLastTransaction + alias Archethic.P2P.Message.GetLastTransactionAddress alias Archethic.P2P.Message.GetTransaction alias Archethic.P2P.Message.GetTransactionChain alias Archethic.P2P.Message.GetTransactionInputs + alias Archethic.P2P.Message.LastTransactionAddress alias Archethic.P2P.Message.NotFound alias Archethic.P2P.Message.TransactionInputList alias Archethic.P2P.Message.TransactionList @@ -99,13 +100,12 @@ defmodule ArchethicWeb.GraphQLSchemaTest do last_address = <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> MockClient - |> stub(:send_message, fn _, %GetLastTransaction{}, _ -> - {:ok, - %Transaction{ - address: last_address, - type: :transfer, - data: %TransactionData{} - }} + |> stub(:send_message, fn + _, %GetLastTransactionAddress{}, _ -> + {:ok, %LastTransactionAddress{address: last_address}} + + _, %GetTransaction{address: ^last_address}, _ -> + {:ok, %Transaction{address: last_address, type: :transfer}} end) conn = @@ -124,8 +124,12 @@ defmodule ArchethicWeb.GraphQLSchemaTest do addr = <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> MockClient - |> stub(:send_message, fn _, %GetLastTransaction{}, _ -> - {:ok, %NotFound{}} + |> stub(:send_message, fn + _, %GetLastTransactionAddress{address: address}, _ -> + {:ok, %LastTransactionAddress{address: address}} + + _, %GetTransaction{}, _ -> + {:ok, %NotFound{}} end) conn =