From 7b7585632456c1675e4d8ff63d30c5bfdf1817e6 Mon Sep 17 00:00:00 2001 From: Samuel Manzanera Date: Wed, 14 Sep 2022 11:49:02 +0200 Subject: [PATCH 1/6] Support stream of UTXO The aim is to reduce timeouts by streaming into batches < 3MB. This would be used to retrieve all the UTXO in the transaction context phase of the mining. --- lib/archethic/mining/transaction_context.ex | 27 ++------ lib/archethic/p2p/message.ex | 66 +++++++++++++++---- .../p2p/message/get_unspent_outputs.ex | 5 +- .../p2p/message/unspent_output_list.ex | 6 +- lib/archethic/transaction_chain.ex | 46 +++++++++---- test/archethic/transaction_chain_test.exs | 4 +- 6 files changed, 101 insertions(+), 53 deletions(-) diff --git a/lib/archethic/mining/transaction_context.ex b/lib/archethic/mining/transaction_context.ex index 4be431874..2895e5405 100644 --- a/lib/archethic/mining/transaction_context.ex +++ b/lib/archethic/mining/transaction_context.ex @@ -59,18 +59,11 @@ defmodule Archethic.Mining.TransactionContext do |> Enum.uniq() prev_tx_task = request_previous_tx(previous_address, prev_tx_nodes_split) - utxo_task = request_utxo(previous_address, unspent_outputs_nodes_split) nodes_view_task = request_nodes_view(node_public_keys) prev_tx = Task.await(prev_tx_task, Message.get_max_timeout()) - utxos = Task.await(utxo_task) nodes_view = Task.await(nodes_view_task) - # involved_nodes = - # [prev_tx_node_involved, utxo_node_involved] - # |> Enum.filter(& &1) - # |> P2P.distinct_nodes() - %{ chain_nodes_view: chain_storage_nodes_view, beacon_nodes_view: beacon_storage_nodes_view, @@ -83,7 +76,13 @@ defmodule Archethic.Mining.TransactionContext do io_storage_node_public_keys ) - # TODO: remove the invovled nodes as not used anymore + utxos = + TransactionChain.stream_unspent_outputs_remotely( + previous_address, + unspent_outputs_nodes_split + ) + |> Enum.to_list() + {prev_tx, utxos, [], chain_storage_nodes_view, beacon_storage_nodes_view, io_storage_nodes_view} end @@ -117,18 +116,6 @@ defmodule Archethic.Mining.TransactionContext do end) end - defp request_utxo(previous_address, nodes) do - Task.Supervisor.async(TaskSupervisor, fn -> - {:ok, utxos} = - TransactionChain.fetch_unspent_outputs_remotely( - previous_address, - nodes - ) - - utxos - end) - end - defp request_nodes_view(node_public_keys) do Task.Supervisor.async(TaskSupervisor, fn -> Task.Supervisor.async_stream_nolink( diff --git a/lib/archethic/p2p/message.ex b/lib/archethic/p2p/message.ex index e4ba57360..43ae7ae2c 100644 --- a/lib/archethic/p2p/message.ex +++ b/lib/archethic/p2p/message.ex @@ -174,10 +174,6 @@ defmodule Archethic.P2P.Message do @spec get_timeout(__MODULE__.t()) :: non_neg_integer() def get_timeout(%GetTransaction{}), do: get_max_timeout() def get_timeout(%GetLastTransaction{}), do: get_max_timeout() - def get_timeout(%NewTransaction{}), do: get_max_timeout() - def get_timeout(%StartMining{}), do: get_max_timeout() - def get_timeout(%ReplicateTransaction{}), do: get_max_timeout() - def get_timeout(%ReplicateTransactionChain{}), do: get_max_timeout() def get_timeout(%GetTransactionChain{}) do # As we use 10 transaction in the pagination we can estimate the max time @@ -191,7 +187,6 @@ defmodule Archethic.P2P.Message do # length(addresses) * trunc(beacon_summary_high_estimation_bytes / @floor_upload_speed * 1000) # end - def get_timeout(%GetUnspentOutputs{}), do: get_max_timeout() def get_timeout(%GetTransactionInputs{}), do: get_max_timeout() def get_timeout(_), do: 3_000 @@ -249,8 +244,8 @@ defmodule Archethic.P2P.Message do <<4::8, tx_address::binary, byte_size(paging_state)::8, paging_state::binary>> end - def encode(%GetUnspentOutputs{address: tx_address}) do - <<5::8, tx_address::binary>> + def encode(%GetUnspentOutputs{address: tx_address, offset: offset}) do + <<5::8, tx_address::binary, VarInt.from_value(offset)::binary>> end def encode(%NewTransaction{transaction: tx}) do @@ -560,7 +555,7 @@ defmodule Archethic.P2P.Message do <<249::8, encoded_nodes_length::binary, nodes_bin::bitstring>> end - def encode(%UnspentOutputList{unspent_outputs: unspent_outputs}) do + def encode(%UnspentOutputList{unspent_outputs: unspent_outputs, more?: more?, offset: offset}) do unspent_outputs_bin = unspent_outputs |> Stream.map(&UnspentOutput.serialize/1) @@ -569,7 +564,10 @@ defmodule Archethic.P2P.Message do encoded_unspent_outputs_length = Enum.count(unspent_outputs) |> VarInt.from_value() - <<250::8, encoded_unspent_outputs_length::binary, unspent_outputs_bin::binary>> + more_bit = if more?, do: 1, else: 0 + + <<250::8, encoded_unspent_outputs_length::binary, unspent_outputs_bin::binary, more_bit::1, + VarInt.from_value(offset)::binary>> end def encode(%TransactionList{transactions: transactions, more?: false}) do @@ -667,7 +665,9 @@ defmodule Archethic.P2P.Message do def decode(<<5::8, rest::bitstring>>) do {address, rest} = Utils.deserialize_address(rest) - {%GetUnspentOutputs{address: address}, rest} + + {offset, rest} = VarInt.get_value(rest) + {%GetUnspentOutputs{address: address, offset: offset}, rest} end def decode(<<6::8, rest::bitstring>>) do @@ -1086,8 +1086,15 @@ defmodule Archethic.P2P.Message do def decode(<<250::8, rest::bitstring>>) do {nb_unspent_outputs, rest} = rest |> VarInt.get_value() - {unspent_outputs, rest} = deserialize_unspent_output_list(rest, nb_unspent_outputs, []) - {%UnspentOutputList{unspent_outputs: unspent_outputs}, rest} + + {unspent_outputs, <>} = + deserialize_unspent_output_list(rest, nb_unspent_outputs, []) + + more? = more_bit == 1 + + {offset, rest} = VarInt.get_value(rest) + + {%UnspentOutputList{unspent_outputs: unspent_outputs, more?: more?, offset: offset}, rest} end def decode(<<251::8, rest::bitstring>>) do @@ -1276,9 +1283,40 @@ defmodule Archethic.P2P.Message do %TransactionList{transactions: chain, paging_state: paging_state, more?: more?} end - def process(%GetUnspentOutputs{address: tx_address}) do + def process(%GetUnspentOutputs{address: tx_address, offset: offset}) do + utxos = Account.get_unspent_outputs(tx_address) + utxos_length = length(utxos) + + %{utxos: utxos, offset: offset, more?: more?} = + utxos + |> Enum.with_index() + |> Enum.drop(offset) + |> Enum.reduce_while(%{utxos: [], offset: 0, more?: false}, fn {utxo, index}, acc -> + acc_size = + acc.utxos + |> Enum.map(&UnspentOutput.serialize/1) + |> :erlang.list_to_binary() + |> byte_size() + + utxo_size = UnspentOutput.serialize(utxo) |> byte_size + + if acc_size + utxo_size < 3_000_000 do + new_acc = + acc + |> Map.update!(:utxos, &[utxo | &1]) + |> Map.put(:offset, index + 1) + |> Map.put(:more?, index + 1 < utxos_length) + + {:cont, new_acc} + else + {:halt, acc} + end + end) + %UnspentOutputList{ - unspent_outputs: Account.get_unspent_outputs(tx_address) + unspent_outputs: utxos, + offset: offset, + more?: more? } end diff --git a/lib/archethic/p2p/message/get_unspent_outputs.ex b/lib/archethic/p2p/message/get_unspent_outputs.ex index 3b87a8df7..9bf348557 100644 --- a/lib/archethic/p2p/message/get_unspent_outputs.ex +++ b/lib/archethic/p2p/message/get_unspent_outputs.ex @@ -3,11 +3,12 @@ defmodule Archethic.P2P.Message.GetUnspentOutputs do Represents a message to request the list of unspent outputs from a transaction """ @enforce_keys [:address] - defstruct [:address] + defstruct [:address, offset: 0] alias Archethic.Crypto @type t :: %__MODULE__{ - address: Crypto.versioned_hash() + address: Crypto.versioned_hash(), + offset: non_neg_integer() } end diff --git a/lib/archethic/p2p/message/unspent_output_list.ex b/lib/archethic/p2p/message/unspent_output_list.ex index a41df627f..f76981422 100644 --- a/lib/archethic/p2p/message/unspent_output_list.ex +++ b/lib/archethic/p2p/message/unspent_output_list.ex @@ -2,11 +2,13 @@ defmodule Archethic.P2P.Message.UnspentOutputList do @moduledoc """ Represents a message with a list of unspent outputs """ - defstruct unspent_outputs: [] + defstruct unspent_outputs: [], more?: false, offset: 0 alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.UnspentOutput @type t :: %__MODULE__{ - unspent_outputs: list(UnspentOutput.t()) + unspent_outputs: list(UnspentOutput.t()), + more?: boolean(), + offset: non_neg_integer() } end diff --git a/lib/archethic/transaction_chain.ex b/lib/archethic/transaction_chain.ex index 7fd2d7261..e29accdf1 100644 --- a/lib/archethic/transaction_chain.ex +++ b/lib/archethic/transaction_chain.ex @@ -755,20 +755,40 @@ defmodule Archethic.TransactionChain do end @doc """ - Fetch the transaction unspent outputs for a transaction address at a given time - - If the utxo exist, then they are returned in the shape of `{:ok, inputs}`. - If no nodes are able to answer the request, `{:error, :network_issue}` is returned. + Stream the transaction unspent outputs for a transaction address """ - @spec fetch_unspent_outputs_remotely( + @spec stream_unspent_outputs_remotely( address :: Crypto.versioned_hash(), list(Node.t()) - ) :: - {:ok, list(UnspentOutput.t())} | {:error, :network_issue} - def fetch_unspent_outputs_remotely(_, []), do: {:ok, []} + ) :: Enumerable.t() | list(UnspentOutput.t()) + def stream_unspent_outputs_remotely(_, []), do: [] - def fetch_unspent_outputs_remotely(address, nodes) + def stream_unspent_outputs_remotely(address, nodes) when is_binary(address) and is_list(nodes) do + Stream.resource( + fn -> fetch_unspent_outputs_remotely(address, nodes) end, + fn + {utxos, true, offset} -> + {utxos, fetch_unspent_outputs_remotely(address, nodes, offset)} + + {utxos, false, _} -> + {utxos, :eof} + + :eof -> + {:halt, nil} + end, + fn _ -> :ok end + ) + end + + @doc """ + Fetch the unspent outputs + """ + @spec fetch_unspent_outputs_remotely(binary(), list(Node.t()), non_neg_integer()) :: + {list(UnspentOutput.t()), boolean(), non_neg_integer() | nil} + def fetch_unspent_outputs_remotely(address, nodes, offset \\ 0) + + def fetch_unspent_outputs_remotely(address, nodes, offset) do conflict_resolver = fn results -> results |> Enum.sort_by(&length(&1.unspent_outputs), :desc) @@ -777,14 +797,14 @@ defmodule Archethic.TransactionChain do case P2P.quorum_read( nodes, - %GetUnspentOutputs{address: address}, + %GetUnspentOutputs{address: address, offset: offset}, conflict_resolver ) do - {:ok, %UnspentOutputList{unspent_outputs: unspent_outputs}} -> - {:ok, unspent_outputs} + {:ok, %UnspentOutputList{unspent_outputs: unspent_outputs, more?: more?, offset: offset}} -> + {unspent_outputs, more?, offset} {:error, :network_issue} -> - {:error, :network_issue} + {[], false, nil} end end diff --git a/test/archethic/transaction_chain_test.exs b/test/archethic/transaction_chain_test.exs index ab37d82b0..f43435755 100644 --- a/test/archethic/transaction_chain_test.exs +++ b/test/archethic/transaction_chain_test.exs @@ -436,7 +436,7 @@ defmodule Archethic.TransactionChainTest do }} end) - assert {:ok, [%UnspentOutput{from: "Alice2", amount: 10, type: :UCO}]} = + assert {[%UnspentOutput{from: "Alice2", amount: 10, type: :UCO}], _more?, _offset} = TransactionChain.fetch_unspent_outputs_remotely("Alice1", nodes) end @@ -485,7 +485,7 @@ defmodule Archethic.TransactionChainTest do }} end) - assert {:ok, [%UnspentOutput{from: "Alice2"}, %UnspentOutput{from: "Bob3"}]} = + assert {[%UnspentOutput{from: "Alice2"}, %UnspentOutput{from: "Bob3"}], _more?, _offset} = TransactionChain.fetch_unspent_outputs_remotely("Alice1", nodes) end end From 0d0d5aad0efd07014f8cab4370f9355b4e1728f2 Mon Sep 17 00:00:00 2001 From: Samuel Manzanera Date: Wed, 14 Sep 2022 14:52:20 +0200 Subject: [PATCH 2/6] Support stream of tx inputs --- lib/archethic.ex | 25 +++++++- lib/archethic/p2p/message.ex | 63 +++++++++++++++---- .../p2p/message/get_transaction_inputs.ex | 5 +- .../p2p/message/transaction_input_list.ex | 6 +- .../replication/transaction_context.ex | 5 +- lib/archethic/transaction_chain.ex | 41 +++++++++--- lib/archethic_web/graphql_schema.ex | 16 +++-- .../graphql_schema/integer_type.ex | 37 +++++++++++ lib/archethic_web/graphql_schema/page_type.ex | 22 ------- lib/archethic_web/graphql_schema/resolver.ex | 20 +++--- .../graphql_schema/transaction_type.ex | 9 ++- .../live/transaction_details_live.ex | 25 +++----- test/archethic/transaction_chain_test.exs | 5 +- test/archethic_test.exs | 3 +- 14 files changed, 197 insertions(+), 85 deletions(-) create mode 100644 lib/archethic_web/graphql_schema/integer_type.ex delete mode 100644 lib/archethic_web/graphql_schema/page_type.ex diff --git a/lib/archethic.ex b/lib/archethic.ex index df3adefae..98b9ececa 100644 --- a/lib/archethic.ex +++ b/lib/archethic.ex @@ -166,8 +166,7 @@ defmodule Archethic do @doc """ Request to fetch the inputs for a transaction address from the closest nodes """ - @spec get_transaction_inputs(binary()) :: - {:ok, list(TransactionInput.t())} | {:error, :network_issue} + @spec get_transaction_inputs(binary()) :: list(TransactionInput.t()) def get_transaction_inputs(address) when is_binary(address) do nodes = address @@ -175,7 +174,27 @@ defmodule Archethic do |> P2P.nearest_nodes() |> Enum.filter(&Node.locally_available?/1) - TransactionChain.fetch_inputs_remotely(address, nodes, DateTime.utc_now()) + address + |> TransactionChain.stream_inputs_remotely(nodes, DateTime.utc_now()) + |> Enum.to_list() + end + + @doc """ + Request to fetch the inputs for a transaction address from the closest nodes at a given page + """ + @spec get_transaction_inputs(binary(), non_neg_integer()) :: list(TransactionInput.t()) + def get_transaction_inputs(address, page) + when is_binary(address) and is_integer(page) and page >= 0 do + nodes = + address + |> Election.chain_storage_nodes(P2P.authorized_and_available_nodes()) + |> P2P.nearest_nodes() + |> Enum.filter(&Node.locally_available?/1) + + {inputs, _more?, _offset} = + TransactionChain.fetch_inputs_remotely(address, nodes, DateTime.utc_now(), page) + + inputs end @doc """ diff --git a/lib/archethic/p2p/message.ex b/lib/archethic/p2p/message.ex index 43ae7ae2c..448e64cb7 100644 --- a/lib/archethic/p2p/message.ex +++ b/lib/archethic/p2p/message.ex @@ -187,8 +187,6 @@ defmodule Archethic.P2P.Message do # length(addresses) * trunc(beacon_summary_high_estimation_bytes / @floor_upload_speed * 1000) # end - def get_timeout(%GetTransactionInputs{}), do: get_max_timeout() - def get_timeout(_), do: 3_000 @doc """ @@ -346,8 +344,8 @@ defmodule Archethic.P2P.Message do <<16::8, address::binary>> end - def encode(%GetTransactionInputs{address: address}) do - <<17::8, address::binary>> + def encode(%GetTransactionInputs{address: address, offset: offset}) do + <<17::8, address::binary, VarInt.from_value(offset)::binary>> end def encode(%GetTransactionChainLength{address: address}) do @@ -489,7 +487,7 @@ defmodule Archethic.P2P.Message do <<243::8, bit_size(view)::8, view::bitstring>> end - def encode(%TransactionInputList{inputs: inputs}) do + def encode(%TransactionInputList{inputs: inputs, more?: more?, offset: offset}) do inputs_bin = inputs |> Stream.map(&TransactionInput.serialize/1) @@ -498,7 +496,10 @@ defmodule Archethic.P2P.Message do encoded_inputs_length = length(inputs) |> VarInt.from_value() - <<244::8, encoded_inputs_length::binary, inputs_bin::bitstring>> + more_bit = if more?, do: 1, else: 0 + + <<244::8, encoded_inputs_length::binary, inputs_bin::bitstring, more_bit::1, + VarInt.from_value(offset)::binary>> end def encode(%TransactionChainLength{length: length}) do @@ -823,7 +824,8 @@ defmodule Archethic.P2P.Message do def decode(<<17::8, rest::bitstring>>) do {address, rest} = Utils.deserialize_address(rest) - {%GetTransactionInputs{address: address}, rest} + {offset, rest} = VarInt.get_value(rest) + {%GetTransactionInputs{address: address, offset: offset}, rest} end def decode(<<18::8, rest::bitstring>>) do @@ -1034,10 +1036,18 @@ defmodule Archethic.P2P.Message do def decode(<<244::8, rest::bitstring>>) do {nb_inputs, rest} = rest |> VarInt.get_value() - {inputs, rest} = deserialize_transaction_inputs(rest, nb_inputs, []) + + {inputs, <>} = + deserialize_transaction_inputs(rest, nb_inputs, []) + + more? = more_bit == 1 + + {offset, rest} = VarInt.get_value(rest) {%TransactionInputList{ - inputs: inputs + inputs: inputs, + more?: more?, + offset: offset }, rest} end @@ -1485,7 +1495,7 @@ defmodule Archethic.P2P.Message do } end - def process(%GetTransactionInputs{address: address}) do + def process(%GetTransactionInputs{address: address, offset: offset}) do contract_inputs = address |> Contracts.list_contract_transactions() @@ -1493,8 +1503,39 @@ defmodule Archethic.P2P.Message do %TransactionInput{from: address, type: :call, timestamp: timestamp} end) + inputs = Account.get_inputs(address) ++ contract_inputs + inputs_length = length(inputs) + + %{inputs: inputs, offset: offset, more?: more?} = + inputs + |> Enum.with_index() + |> Enum.drop(offset) + |> Enum.reduce_while(%{inputs: [], offset: 0, more?: false}, fn {input, index}, acc -> + acc_size = + acc.inputs + |> Enum.map(&TransactionInput.serialize/1) + |> :erlang.list_to_bitstring() + |> byte_size() + + input_size = TransactionInput.serialize(input) |> byte_size + + if acc_size + input_size < 3_000_000 do + new_acc = + acc + |> Map.update!(:inputs, &[input | &1]) + |> Map.put(:offset, index + 1) + |> Map.put(:more?, index + 1 < inputs_length) + + {:cont, new_acc} + else + {:halt, acc} + end + end) + %TransactionInputList{ - inputs: Account.get_inputs(address) ++ contract_inputs + inputs: inputs, + more?: more?, + offset: offset } end diff --git a/lib/archethic/p2p/message/get_transaction_inputs.ex b/lib/archethic/p2p/message/get_transaction_inputs.ex index a40beef6d..78851a6c7 100644 --- a/lib/archethic/p2p/message/get_transaction_inputs.ex +++ b/lib/archethic/p2p/message/get_transaction_inputs.ex @@ -3,11 +3,12 @@ defmodule Archethic.P2P.Message.GetTransactionInputs do Represents a message with to request the inputs (spent or unspents) from a transaction """ @enforce_keys [:address] - defstruct [:address] + defstruct [:address, offset: 0] alias Archethic.Crypto @type t :: %__MODULE__{ - address: Crypto.versioned_hash() + address: Crypto.versioned_hash(), + offset: non_neg_integer() } end diff --git a/lib/archethic/p2p/message/transaction_input_list.ex b/lib/archethic/p2p/message/transaction_input_list.ex index 81ca9c566..afac9d71c 100644 --- a/lib/archethic/p2p/message/transaction_input_list.ex +++ b/lib/archethic/p2p/message/transaction_input_list.ex @@ -2,11 +2,13 @@ defmodule Archethic.P2P.Message.TransactionInputList do @moduledoc """ Represents a message with a list of transaction inputs """ - defstruct inputs: [] + defstruct inputs: [], more?: false, offset: 0 alias Archethic.TransactionChain.TransactionInput @type t() :: %__MODULE__{ - inputs: list(TransactionInput.t()) + inputs: list(TransactionInput.t()), + more?: boolean(), + offset: non_neg_integer() } end diff --git a/lib/archethic/replication/transaction_context.ex b/lib/archethic/replication/transaction_context.ex index c321f8c22..e0d64aa94 100644 --- a/lib/archethic/replication/transaction_context.ex +++ b/lib/archethic/replication/transaction_context.ex @@ -59,8 +59,9 @@ defmodule Archethic.Replication.TransactionContext do def fetch_transaction_inputs(address, timestamp = %DateTime{}) when is_binary(address) do nodes = replication_nodes(address) - {:ok, inputs} = TransactionChain.fetch_inputs_remotely(address, nodes, timestamp) - inputs + address + |> TransactionChain.stream_inputs_remotely(nodes, timestamp) + |> Enum.to_list() end defp replication_nodes(address) do diff --git a/lib/archethic/transaction_chain.ex b/lib/archethic/transaction_chain.ex index e29accdf1..2c1ad5371 100644 --- a/lib/archethic/transaction_chain.ex +++ b/lib/archethic/transaction_chain.ex @@ -722,6 +722,30 @@ defmodule Archethic.TransactionChain do end end + @doc """ + Stream the trnasaction inputs for a transaction address at a given time + """ + @spec stream_inputs_remotely(binary(), list(Node.t()), DateTime.t()) :: + Enumerable.t() | list(TransactionInput.t()) + def stream_inputs_remotely(_, [], _, _), do: [] + + def stream_inputs_remotely(address, nodes, timestamp) do + Stream.resource( + fn -> fetch_inputs_remotely(address, nodes, timestamp) end, + fn + {inputs, true, offset} -> + {inputs, fetch_inputs_remotely(address, nodes, timestamp, offset)} + + {inputs, false, _} -> + {inputs, :eof} + + :eof -> + {:halt, nil} + end, + fn _ -> :ok end + ) + end + @doc """ Fetch the transaction inputs for a transaction address at a given time @@ -729,11 +753,12 @@ defmodule Archethic.TransactionChain do If no nodes are able to answer the request, `{:error, :network_issue}` is returned. """ @spec fetch_inputs_remotely(address :: Crypto.versioned_hash(), list(Node.t()), DateTime.t()) :: - {:ok, list(TransactionInput.t())} | {:error, :network_issue} - def fetch_inputs_remotely(_, [], _), do: {:ok, []} + {inputs :: list(TransactionInput.t()), more? :: boolean(), offset :: non_neg_integer()} + def fetch_inputs_remotely(address, nodes, timestamp, offset \\ 0) + def fetch_inputs_remotely(_, [], _, _), do: {[], false, 0} - def fetch_inputs_remotely(address, nodes, timestamp = %DateTime{}) - when is_binary(address) and is_list(nodes) do + def fetch_inputs_remotely(address, nodes, timestamp = %DateTime{}, offset) + when is_binary(address) and is_list(nodes) and is_integer(offset) and offset >= 0 do conflict_resolver = fn results -> results |> Enum.sort_by(&length(&1.inputs), :desc) @@ -742,15 +767,15 @@ defmodule Archethic.TransactionChain do case P2P.quorum_read( nodes, - %GetTransactionInputs{address: address}, + %GetTransactionInputs{address: address, offset: offset}, conflict_resolver ) do - {:ok, %TransactionInputList{inputs: inputs}} -> + {:ok, %TransactionInputList{inputs: inputs, more?: more?, offset: offset}} -> filtered_inputs = Enum.filter(inputs, &(DateTime.diff(&1.timestamp, timestamp) <= 0)) - {:ok, filtered_inputs} + {filtered_inputs, more?, offset} {:error, :network_issue} -> - {:error, :network_issue} + {[], false, 0} end end diff --git a/lib/archethic_web/graphql_schema.ex b/lib/archethic_web/graphql_schema.ex index b9461bab0..9fcfb3b1d 100644 --- a/lib/archethic_web/graphql_schema.ex +++ b/lib/archethic_web/graphql_schema.ex @@ -9,7 +9,7 @@ defmodule ArchethicWeb.GraphQLSchema do alias __MODULE__.Resolver alias __MODULE__.SharedSecretsType alias __MODULE__.TransactionType - alias __MODULE__.PageType + alias __MODULE__.IntegerType alias __MODULE__.TransactionAttestation alias __MODULE__.TransactionError alias __MODULE__.OracleData @@ -21,7 +21,7 @@ defmodule ArchethicWeb.GraphQLSchema do import_types(P2PType) import_types(TransactionAttestation) import_types(TransactionError) - import_types(PageType) + import_types(IntegerType) import_types(OracleData) query do @@ -51,7 +51,7 @@ defmodule ArchethicWeb.GraphQLSchema do Query the network to find all the transactions locally stored """ field :transactions, list_of(:transaction) do - arg(:page, :page) + arg(:page, :pos_integer) resolve(fn args, _ -> page = Map.get(args, :page, 1) @@ -99,9 +99,13 @@ defmodule ArchethicWeb.GraphQLSchema do """ field :transaction_inputs, list_of(:transaction_input) do arg(:address, non_null(:address)) + arg(:paging_offset, :non_neg_integer) + arg(:limit, :pos_integer) - resolve(fn %{address: address}, _ -> - Resolver.get_inputs(address) + resolve(fn args = %{address: address}, _ -> + paging_offset = Map.get(args, :paging_offset, 0) + limit = Map.get(args, :limit, 0) + Resolver.get_inputs(address, paging_offset, limit) end) end @@ -122,7 +126,7 @@ defmodule ArchethicWeb.GraphQLSchema do """ field :network_transactions, list_of(:transaction) do arg(:type, non_null(:transaction_type)) - arg(:page, :page) + arg(:page, :pos_integer) resolve(fn args, _ -> type = Map.get(args, :type) diff --git a/lib/archethic_web/graphql_schema/integer_type.ex b/lib/archethic_web/graphql_schema/integer_type.ex new file mode 100644 index 000000000..05d8b4626 --- /dev/null +++ b/lib/archethic_web/graphql_schema/integer_type.ex @@ -0,0 +1,37 @@ +defmodule ArchethicWeb.GraphQLSchema.IntegerType do + @moduledoc false + + use Absinthe.Schema.Notation + + @desc """ + The [Positive Integer] scalar type represents a positive number + """ + scalar :pos_integer do + parse(&do_parse_pos_integer/1) + end + + @desc """ + The [Non Negative Integer] scalar type represents a non negative number + """ + scalar :non_neg_integer do + parse(&do_parse_non_neg_integer/1) + end + + defp do_parse_pos_integer(%Absinthe.Blueprint.Input.Integer{value: integer}) + when integer >= 1 do + {:ok, integer} + end + + defp do_parse_pos_integer(_) do + :error + end + + defp do_parse_non_neg_integer(%Absinthe.Blueprint.Input.Integer{value: integer}) + when integer >= 0 do + {:ok, integer} + end + + defp do_parse_non_neg_integer(_) do + :error + end +end diff --git a/lib/archethic_web/graphql_schema/page_type.ex b/lib/archethic_web/graphql_schema/page_type.ex deleted file mode 100644 index 058acfbdb..000000000 --- a/lib/archethic_web/graphql_schema/page_type.ex +++ /dev/null @@ -1,22 +0,0 @@ -defmodule ArchethicWeb.GraphQLSchema.PageType do - @moduledoc false - - use Absinthe.Schema.Notation - - @desc """ - The [Page] scalar type represents the page number - """ - scalar :page do - parse(&do_parse/1) - end - - @spec do_parse(Absinthe.Blueprint.Input.Integer.t()) :: - {:ok, integer()} | :error - defp do_parse(%Absinthe.Blueprint.Input.Integer{value: page_value}) when page_value >= 1 do - {:ok, page_value} - end - - defp do_parse(_page) do - :error - end -end diff --git a/lib/archethic_web/graphql_schema/resolver.ex b/lib/archethic_web/graphql_schema/resolver.ex index e16bcffa1..83c649943 100644 --- a/lib/archethic_web/graphql_schema/resolver.ex +++ b/lib/archethic_web/graphql_schema/resolver.ex @@ -122,13 +122,19 @@ defmodule ArchethicWeb.GraphQLSchema.Resolver do end) end - def get_inputs(address) do - case Archethic.get_transaction_inputs(address) do - {:ok, inputs} -> - {:ok, Enum.map(inputs, &TransactionInput.to_map/1)} - - {:error, _} = e -> - e + def get_inputs(address, paging_offset \\ 0, limit \\ 0) do + inputs = + address + |> Archethic.get_transaction_inputs(paging_offset) + |> Enum.map(&TransactionInput.to_map/1) + |> Enum.sort_by(& &1.timestamp, {:desc, DateTime}) + + case limit do + 0 -> + {:ok, inputs} + + limit -> + {:ok, Enum.take(inputs, limit)} end end diff --git a/lib/archethic_web/graphql_schema/transaction_type.ex b/lib/archethic_web/graphql_schema/transaction_type.ex index dd1af6f33..733f81053 100644 --- a/lib/archethic_web/graphql_schema/transaction_type.ex +++ b/lib/archethic_web/graphql_schema/transaction_type.ex @@ -25,8 +25,13 @@ defmodule ArchethicWeb.GraphQLSchema.TransactionType do field(:cross_validation_stamps, list_of(:cross_validation_stamp)) field :inputs, list_of(:transaction_input) do - resolve(fn _, %{source: %{address: address}} -> - Resolver.get_inputs(address) + arg(:paging_offset, :non_neg_integer) + arg(:limit, :pos_integer) + + resolve(fn args, %{source: %{address: address}} -> + paging_offset = Map.get(args, :paging_offset, 0) + limit = Map.get(args, :limit, 0) + Resolver.get_inputs(address, paging_offset, limit) end) end diff --git a/lib/archethic_web/live/transaction_details_live.ex b/lib/archethic_web/live/transaction_details_live.ex index 489efe33a..9ab946683 100644 --- a/lib/archethic_web/live/transaction_details_live.ex +++ b/lib/archethic_web/live/transaction_details_live.ex @@ -83,7 +83,7 @@ defmodule ArchethicWeb.TransactionDetailsLive do previous_address = Transaction.previous_address(tx) with {:ok, balance} <- Archethic.get_balance(address), - {:ok, inputs} <- Archethic.get_transaction_inputs(address) do + inputs <- Archethic.get_transaction_inputs(address) do ledger_inputs = Enum.reject(inputs, &(&1.type == :call)) contract_inputs = Enum.filter(inputs, &(&1.type == :call)) uco_price_at_time = tx.validation_stamp.timestamp |> OracleChain.get_uco_price() @@ -107,22 +107,15 @@ defmodule ArchethicWeb.TransactionDetailsLive do end defp handle_not_existing_transaction(socket, address) do - case Archethic.get_transaction_inputs(address) do - {:ok, inputs} -> - ledger_inputs = Enum.reject(inputs, &(&1.type == :call)) - contract_inputs = Enum.filter(inputs, &(&1.type == :call)) + inputs = Archethic.get_transaction_inputs(address) + ledger_inputs = Enum.reject(inputs, &(&1.type == :call)) + contract_inputs = Enum.filter(inputs, &(&1.type == :call)) - socket - |> assign(:address, address) - |> assign(:inputs, ledger_inputs) - |> assign(:calls, contract_inputs) - |> assign(:error, :not_exists) - - {:error, :network_issue} -> - socket - |> assign(:address, address) - |> assign(:error, :network_issue) - end + socket + |> assign(:address, address) + |> assign(:inputs, ledger_inputs) + |> assign(:calls, contract_inputs) + |> assign(:error, :not_exists) end defp handle_invalid_address(socket, address) do diff --git a/test/archethic/transaction_chain_test.exs b/test/archethic/transaction_chain_test.exs index f43435755..ca4e62dfb 100644 --- a/test/archethic/transaction_chain_test.exs +++ b/test/archethic/transaction_chain_test.exs @@ -332,7 +332,7 @@ defmodule Archethic.TransactionChainTest do }} end) - assert {:ok, [%TransactionInput{from: "Alice2", amount: 10, type: :UCO}]} = + assert {[%TransactionInput{from: "Alice2", amount: 10, type: :UCO}], _more?, _offset} = TransactionChain.fetch_inputs_remotely("Alice1", nodes, DateTime.utc_now()) end @@ -398,7 +398,8 @@ defmodule Archethic.TransactionChainTest do }} end) - assert {:ok, [%TransactionInput{from: "Alice2"}, %TransactionInput{from: "Bob3"}]} = + assert {[%TransactionInput{from: "Alice2"}, %TransactionInput{from: "Bob3"}], _more?, + _offset} = TransactionChain.fetch_inputs_remotely("Alice1", nodes, DateTime.utc_now()) end end diff --git a/test/archethic_test.exs b/test/archethic_test.exs index 90803406d..b30fbccc2 100644 --- a/test/archethic_test.exs +++ b/test/archethic_test.exs @@ -269,8 +269,7 @@ defmodule ArchethicTest do }} end) - assert {:ok, - [%TransactionInput{from: "@Bob3", amount: 1_000_000_000, spent?: false, type: :UCO}]} = + assert [%TransactionInput{from: "@Bob3", amount: 1_000_000_000, spent?: false, type: :UCO}] = Archethic.get_transaction_inputs("@Alice2") end end From ab9ac91ed57be5e321ba984ac241470e3a3afedc Mon Sep 17 00:00:00 2001 From: Samuel Manzanera Date: Thu, 15 Sep 2022 18:17:34 +0200 Subject: [PATCH 3/6] Revert timeout --- lib/archethic/p2p/message.ex | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/archethic/p2p/message.ex b/lib/archethic/p2p/message.ex index 448e64cb7..3318d2da7 100644 --- a/lib/archethic/p2p/message.ex +++ b/lib/archethic/p2p/message.ex @@ -174,6 +174,10 @@ defmodule Archethic.P2P.Message do @spec get_timeout(__MODULE__.t()) :: non_neg_integer() def get_timeout(%GetTransaction{}), do: get_max_timeout() def get_timeout(%GetLastTransaction{}), do: get_max_timeout() + def get_timeout(%NewTransaction{}), do: get_max_timeout() + def get_timeout(%StartMining{}), do: get_max_timeout() + def get_timeout(%ReplicateTransaction{}), do: get_max_timeout() + def get_timeout(%ReplicateTransactionChain{}), do: get_max_timeout() def get_timeout(%GetTransactionChain{}) do # As we use 10 transaction in the pagination we can estimate the max time From a6a2eb7765a6aeede55458bda04bdf35fc6e6e03 Mon Sep 17 00:00:00 2001 From: Samuel Manzanera Date: Thu, 15 Sep 2022 20:00:01 +0200 Subject: [PATCH 4/6] Sort on the P2P message handle instead of GraphQL --- lib/archethic/p2p/message.ex | 2 ++ lib/archethic_web/graphql_schema/resolver.ex | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/archethic/p2p/message.ex b/lib/archethic/p2p/message.ex index 3318d2da7..9d9ce1c4e 100644 --- a/lib/archethic/p2p/message.ex +++ b/lib/archethic/p2p/message.ex @@ -1303,6 +1303,7 @@ defmodule Archethic.P2P.Message do %{utxos: utxos, offset: offset, more?: more?} = utxos + |> Enum.sort_by(& &1.timestamp, {:desc, DateTime}) |> Enum.with_index() |> Enum.drop(offset) |> Enum.reduce_while(%{utxos: [], offset: 0, more?: false}, fn {utxo, index}, acc -> @@ -1512,6 +1513,7 @@ defmodule Archethic.P2P.Message do %{inputs: inputs, offset: offset, more?: more?} = inputs + |> Enum.sort_by(& &1.timestamp, {:desc, DateTime}) |> Enum.with_index() |> Enum.drop(offset) |> Enum.reduce_while(%{inputs: [], offset: 0, more?: false}, fn {input, index}, acc -> diff --git a/lib/archethic_web/graphql_schema/resolver.ex b/lib/archethic_web/graphql_schema/resolver.ex index 83c649943..2294b9082 100644 --- a/lib/archethic_web/graphql_schema/resolver.ex +++ b/lib/archethic_web/graphql_schema/resolver.ex @@ -127,7 +127,6 @@ defmodule ArchethicWeb.GraphQLSchema.Resolver do address |> Archethic.get_transaction_inputs(paging_offset) |> Enum.map(&TransactionInput.to_map/1) - |> Enum.sort_by(& &1.timestamp, {:desc, DateTime}) case limit do 0 -> From 8abd27fbea7ef5459b750757cbef1e9948057b54 Mon Sep 17 00:00:00 2001 From: Samuel Manzanera Date: Thu, 15 Sep 2022 20:44:32 +0200 Subject: [PATCH 5/6] Define the limit on the P2P message instead of GraphQL --- lib/archethic.ex | 13 +++++++---- lib/archethic/p2p/message.ex | 23 ++++++++++++++----- .../p2p/message/get_transaction_inputs.ex | 5 ++-- lib/archethic/transaction_chain.ex | 18 ++++++++++----- lib/archethic_web/graphql_schema/resolver.ex | 2 +- 5 files changed, 42 insertions(+), 19 deletions(-) diff --git a/lib/archethic.ex b/lib/archethic.ex index 98b9ececa..1827ae722 100644 --- a/lib/archethic.ex +++ b/lib/archethic.ex @@ -182,9 +182,14 @@ defmodule Archethic do @doc """ Request to fetch the inputs for a transaction address from the closest nodes at a given page """ - @spec get_transaction_inputs(binary(), non_neg_integer()) :: list(TransactionInput.t()) - def get_transaction_inputs(address, page) - when is_binary(address) and is_integer(page) and page >= 0 do + @spec get_transaction_inputs( + binary(), + paging_offset :: non_neg_integer(), + limit :: non_neg_integer() + ) :: list(TransactionInput.t()) + def get_transaction_inputs(address, page, limit) + when is_binary(address) and is_integer(page) and page >= 0 and is_integer(limit) and + limit >= 0 do nodes = address |> Election.chain_storage_nodes(P2P.authorized_and_available_nodes()) @@ -192,7 +197,7 @@ defmodule Archethic do |> Enum.filter(&Node.locally_available?/1) {inputs, _more?, _offset} = - TransactionChain.fetch_inputs_remotely(address, nodes, DateTime.utc_now(), page) + TransactionChain.fetch_inputs_remotely(address, nodes, DateTime.utc_now(), page, limit) inputs end diff --git a/lib/archethic/p2p/message.ex b/lib/archethic/p2p/message.ex index 9d9ce1c4e..e8cab3772 100644 --- a/lib/archethic/p2p/message.ex +++ b/lib/archethic/p2p/message.ex @@ -348,8 +348,9 @@ defmodule Archethic.P2P.Message do <<16::8, address::binary>> end - def encode(%GetTransactionInputs{address: address, offset: offset}) do - <<17::8, address::binary, VarInt.from_value(offset)::binary>> + def encode(%GetTransactionInputs{address: address, offset: offset, limit: limit}) do + <<17::8, address::binary, VarInt.from_value(offset)::binary, + VarInt.from_value(limit)::binary>> end def encode(%GetTransactionChainLength{address: address}) do @@ -829,7 +830,8 @@ defmodule Archethic.P2P.Message do def decode(<<17::8, rest::bitstring>>) do {address, rest} = Utils.deserialize_address(rest) {offset, rest} = VarInt.get_value(rest) - {%GetTransactionInputs{address: address, offset: offset}, rest} + {limit, rest} = VarInt.get_value(rest) + {%GetTransactionInputs{address: address, offset: offset, limit: limit}, rest} end def decode(<<18::8, rest::bitstring>>) do @@ -1500,7 +1502,7 @@ defmodule Archethic.P2P.Message do } end - def process(%GetTransactionInputs{address: address, offset: offset}) do + def process(%GetTransactionInputs{address: address, offset: offset, limit: limit}) do contract_inputs = address |> Contracts.list_contract_transactions() @@ -1525,7 +1527,16 @@ defmodule Archethic.P2P.Message do input_size = TransactionInput.serialize(input) |> byte_size - if acc_size + input_size < 3_000_000 do + size_capacity? = acc_size + input_size < 3_000_000 + + should_take_more? = + if limit > 0 do + length(acc.inputs) < limit and size_capacity? + else + size_capacity? + end + + if should_take_more? do new_acc = acc |> Map.update!(:inputs, &[input | &1]) @@ -1539,7 +1550,7 @@ defmodule Archethic.P2P.Message do end) %TransactionInputList{ - inputs: inputs, + inputs: Enum.reverse(inputs), more?: more?, offset: offset } diff --git a/lib/archethic/p2p/message/get_transaction_inputs.ex b/lib/archethic/p2p/message/get_transaction_inputs.ex index 78851a6c7..4ecf08e2f 100644 --- a/lib/archethic/p2p/message/get_transaction_inputs.ex +++ b/lib/archethic/p2p/message/get_transaction_inputs.ex @@ -3,12 +3,13 @@ defmodule Archethic.P2P.Message.GetTransactionInputs do Represents a message with to request the inputs (spent or unspents) from a transaction """ @enforce_keys [:address] - defstruct [:address, offset: 0] + defstruct [:address, offset: 0, limit: 0] alias Archethic.Crypto @type t :: %__MODULE__{ address: Crypto.versioned_hash(), - offset: non_neg_integer() + offset: non_neg_integer(), + limit: non_neg_integer() } end diff --git a/lib/archethic/transaction_chain.ex b/lib/archethic/transaction_chain.ex index 2c1ad5371..076b8becd 100644 --- a/lib/archethic/transaction_chain.ex +++ b/lib/archethic/transaction_chain.ex @@ -752,13 +752,19 @@ defmodule Archethic.TransactionChain do If the inputs exist, then they are returned in the shape of `{:ok, inputs}`. If no nodes are able to answer the request, `{:error, :network_issue}` is returned. """ - @spec fetch_inputs_remotely(address :: Crypto.versioned_hash(), list(Node.t()), DateTime.t()) :: + @spec fetch_inputs_remotely( + address :: Crypto.versioned_hash(), + list(Node.t()), + DateTime.t(), + limit :: non_neg_integer() + ) :: {inputs :: list(TransactionInput.t()), more? :: boolean(), offset :: non_neg_integer()} - def fetch_inputs_remotely(address, nodes, timestamp, offset \\ 0) - def fetch_inputs_remotely(_, [], _, _), do: {[], false, 0} + def fetch_inputs_remotely(address, nodes, timestamp, offset \\ 0, limit \\ 0) + def fetch_inputs_remotely(_, [], _, _, _), do: {[], false, 0} - def fetch_inputs_remotely(address, nodes, timestamp = %DateTime{}, offset) - when is_binary(address) and is_list(nodes) and is_integer(offset) and offset >= 0 do + def fetch_inputs_remotely(address, nodes, timestamp = %DateTime{}, offset, limit) + when is_binary(address) and is_list(nodes) and is_integer(offset) and offset >= 0 and + is_integer(limit) and limit >= 0 do conflict_resolver = fn results -> results |> Enum.sort_by(&length(&1.inputs), :desc) @@ -767,7 +773,7 @@ defmodule Archethic.TransactionChain do case P2P.quorum_read( nodes, - %GetTransactionInputs{address: address, offset: offset}, + %GetTransactionInputs{address: address, offset: offset, limit: limit}, conflict_resolver ) do {:ok, %TransactionInputList{inputs: inputs, more?: more?, offset: offset}} -> diff --git a/lib/archethic_web/graphql_schema/resolver.ex b/lib/archethic_web/graphql_schema/resolver.ex index 2294b9082..408e8cd09 100644 --- a/lib/archethic_web/graphql_schema/resolver.ex +++ b/lib/archethic_web/graphql_schema/resolver.ex @@ -125,7 +125,7 @@ defmodule ArchethicWeb.GraphQLSchema.Resolver do def get_inputs(address, paging_offset \\ 0, limit \\ 0) do inputs = address - |> Archethic.get_transaction_inputs(paging_offset) + |> Archethic.get_transaction_inputs(paging_offset, limit) |> Enum.map(&TransactionInput.to_map/1) case limit do From 2c8d434bb97139fbbcb87ee822308d95176b89f7 Mon Sep 17 00:00:00 2001 From: Samuel Manzanera Date: Thu, 15 Sep 2022 23:41:02 +0200 Subject: [PATCH 6/6] Fix dialyzer issue (recent Elixir/Erlang version) --- lib/archethic/networking/ip_lookup.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/archethic/networking/ip_lookup.ex b/lib/archethic/networking/ip_lookup.ex index 381d90020..d0ad93f52 100644 --- a/lib/archethic/networking/ip_lookup.ex +++ b/lib/archethic/networking/ip_lookup.ex @@ -39,7 +39,7 @@ defmodule Archethic.Networking.IPLookup do ip {:error, reason} -> - fallback(RemoteDiscovery, reason) + raise "Cannot use remote discovery IP lookup - #{inspect(reason)}" end end