diff --git a/lib/archethic.ex b/lib/archethic.ex index df3adefae3..98b9ececaf 100644 --- a/lib/archethic.ex +++ b/lib/archethic.ex @@ -166,8 +166,7 @@ defmodule Archethic do @doc """ Request to fetch the inputs for a transaction address from the closest nodes """ - @spec get_transaction_inputs(binary()) :: - {:ok, list(TransactionInput.t())} | {:error, :network_issue} + @spec get_transaction_inputs(binary()) :: list(TransactionInput.t()) def get_transaction_inputs(address) when is_binary(address) do nodes = address @@ -175,7 +174,27 @@ defmodule Archethic do |> P2P.nearest_nodes() |> Enum.filter(&Node.locally_available?/1) - TransactionChain.fetch_inputs_remotely(address, nodes, DateTime.utc_now()) + address + |> TransactionChain.stream_inputs_remotely(nodes, DateTime.utc_now()) + |> Enum.to_list() + end + + @doc """ + Request to fetch the inputs for a transaction address from the closest nodes at a given page + """ + @spec get_transaction_inputs(binary(), non_neg_integer()) :: list(TransactionInput.t()) + def get_transaction_inputs(address, page) + when is_binary(address) and is_integer(page) and page >= 0 do + nodes = + address + |> Election.chain_storage_nodes(P2P.authorized_and_available_nodes()) + |> P2P.nearest_nodes() + |> Enum.filter(&Node.locally_available?/1) + + {inputs, _more?, _offset} = + TransactionChain.fetch_inputs_remotely(address, nodes, DateTime.utc_now(), page) + + inputs end @doc """ diff --git a/lib/archethic/p2p/message.ex b/lib/archethic/p2p/message.ex index 43ae7ae2c6..448e64cb75 100644 --- a/lib/archethic/p2p/message.ex +++ b/lib/archethic/p2p/message.ex @@ -187,8 +187,6 @@ defmodule Archethic.P2P.Message do # length(addresses) * trunc(beacon_summary_high_estimation_bytes / @floor_upload_speed * 1000) # end - def get_timeout(%GetTransactionInputs{}), do: get_max_timeout() - def get_timeout(_), do: 3_000 @doc """ @@ -346,8 +344,8 @@ defmodule Archethic.P2P.Message do <<16::8, address::binary>> end - def encode(%GetTransactionInputs{address: address}) do - <<17::8, address::binary>> + def encode(%GetTransactionInputs{address: address, offset: offset}) do + <<17::8, address::binary, VarInt.from_value(offset)::binary>> end def encode(%GetTransactionChainLength{address: address}) do @@ -489,7 +487,7 @@ defmodule Archethic.P2P.Message do <<243::8, bit_size(view)::8, view::bitstring>> end - def encode(%TransactionInputList{inputs: inputs}) do + def encode(%TransactionInputList{inputs: inputs, more?: more?, offset: offset}) do inputs_bin = inputs |> Stream.map(&TransactionInput.serialize/1) @@ -498,7 +496,10 @@ defmodule Archethic.P2P.Message do encoded_inputs_length = length(inputs) |> VarInt.from_value() - <<244::8, encoded_inputs_length::binary, inputs_bin::bitstring>> + more_bit = if more?, do: 1, else: 0 + + <<244::8, encoded_inputs_length::binary, inputs_bin::bitstring, more_bit::1, + VarInt.from_value(offset)::binary>> end def encode(%TransactionChainLength{length: length}) do @@ -823,7 +824,8 @@ defmodule Archethic.P2P.Message do def decode(<<17::8, rest::bitstring>>) do {address, rest} = Utils.deserialize_address(rest) - {%GetTransactionInputs{address: address}, rest} + {offset, rest} = VarInt.get_value(rest) + {%GetTransactionInputs{address: address, offset: offset}, rest} end def decode(<<18::8, rest::bitstring>>) do @@ -1034,10 +1036,18 @@ defmodule Archethic.P2P.Message do def decode(<<244::8, rest::bitstring>>) do {nb_inputs, rest} = rest |> VarInt.get_value() - {inputs, rest} = deserialize_transaction_inputs(rest, nb_inputs, []) + + {inputs, <>} = + deserialize_transaction_inputs(rest, nb_inputs, []) + + more? = more_bit == 1 + + {offset, rest} = VarInt.get_value(rest) {%TransactionInputList{ - inputs: inputs + inputs: inputs, + more?: more?, + offset: offset }, rest} end @@ -1485,7 +1495,7 @@ defmodule Archethic.P2P.Message do } end - def process(%GetTransactionInputs{address: address}) do + def process(%GetTransactionInputs{address: address, offset: offset}) do contract_inputs = address |> Contracts.list_contract_transactions() @@ -1493,8 +1503,39 @@ defmodule Archethic.P2P.Message do %TransactionInput{from: address, type: :call, timestamp: timestamp} end) + inputs = Account.get_inputs(address) ++ contract_inputs + inputs_length = length(inputs) + + %{inputs: inputs, offset: offset, more?: more?} = + inputs + |> Enum.with_index() + |> Enum.drop(offset) + |> Enum.reduce_while(%{inputs: [], offset: 0, more?: false}, fn {input, index}, acc -> + acc_size = + acc.inputs + |> Enum.map(&TransactionInput.serialize/1) + |> :erlang.list_to_bitstring() + |> byte_size() + + input_size = TransactionInput.serialize(input) |> byte_size + + if acc_size + input_size < 3_000_000 do + new_acc = + acc + |> Map.update!(:inputs, &[input | &1]) + |> Map.put(:offset, index + 1) + |> Map.put(:more?, index + 1 < inputs_length) + + {:cont, new_acc} + else + {:halt, acc} + end + end) + %TransactionInputList{ - inputs: Account.get_inputs(address) ++ contract_inputs + inputs: inputs, + more?: more?, + offset: offset } end diff --git a/lib/archethic/p2p/message/get_transaction_inputs.ex b/lib/archethic/p2p/message/get_transaction_inputs.ex index a40beef6db..78851a6c7f 100644 --- a/lib/archethic/p2p/message/get_transaction_inputs.ex +++ b/lib/archethic/p2p/message/get_transaction_inputs.ex @@ -3,11 +3,12 @@ defmodule Archethic.P2P.Message.GetTransactionInputs do Represents a message with to request the inputs (spent or unspents) from a transaction """ @enforce_keys [:address] - defstruct [:address] + defstruct [:address, offset: 0] alias Archethic.Crypto @type t :: %__MODULE__{ - address: Crypto.versioned_hash() + address: Crypto.versioned_hash(), + offset: non_neg_integer() } end diff --git a/lib/archethic/p2p/message/transaction_input_list.ex b/lib/archethic/p2p/message/transaction_input_list.ex index 81ca9c5667..afac9d71c1 100644 --- a/lib/archethic/p2p/message/transaction_input_list.ex +++ b/lib/archethic/p2p/message/transaction_input_list.ex @@ -2,11 +2,13 @@ defmodule Archethic.P2P.Message.TransactionInputList do @moduledoc """ Represents a message with a list of transaction inputs """ - defstruct inputs: [] + defstruct inputs: [], more?: false, offset: 0 alias Archethic.TransactionChain.TransactionInput @type t() :: %__MODULE__{ - inputs: list(TransactionInput.t()) + inputs: list(TransactionInput.t()), + more?: boolean(), + offset: non_neg_integer() } end diff --git a/lib/archethic/replication/transaction_context.ex b/lib/archethic/replication/transaction_context.ex index c321f8c224..e0d64aa94e 100644 --- a/lib/archethic/replication/transaction_context.ex +++ b/lib/archethic/replication/transaction_context.ex @@ -59,8 +59,9 @@ defmodule Archethic.Replication.TransactionContext do def fetch_transaction_inputs(address, timestamp = %DateTime{}) when is_binary(address) do nodes = replication_nodes(address) - {:ok, inputs} = TransactionChain.fetch_inputs_remotely(address, nodes, timestamp) - inputs + address + |> TransactionChain.stream_inputs_remotely(nodes, timestamp) + |> Enum.to_list() end defp replication_nodes(address) do diff --git a/lib/archethic/transaction_chain.ex b/lib/archethic/transaction_chain.ex index 38f8d4b2ac..fc0a20cf4a 100644 --- a/lib/archethic/transaction_chain.ex +++ b/lib/archethic/transaction_chain.ex @@ -722,6 +722,30 @@ defmodule Archethic.TransactionChain do end end + @doc """ + Stream the trnasaction inputs for a transaction address at a given time + """ + @spec stream_inputs_remotely(binary(), list(Node.t()), DateTime.t()) :: + Enumerable.t() | list(TransactionInput.t()) + def stream_inputs_remotely(_, [], _, _), do: [] + + def stream_inputs_remotely(address, nodes, timestamp) do + Stream.resource( + fn -> fetch_inputs_remotely(address, nodes, timestamp) end, + fn + {inputs, true, offset} -> + {inputs, fetch_inputs_remotely(address, nodes, timestamp, offset)} + + {inputs, false, _} -> + {inputs, :eof} + + :eof -> + {:halt, nil} + end, + fn _ -> :ok end + ) + end + @doc """ Fetch the transaction inputs for a transaction address at a given time @@ -729,11 +753,12 @@ defmodule Archethic.TransactionChain do If no nodes are able to answer the request, `{:error, :network_issue}` is returned. """ @spec fetch_inputs_remotely(address :: Crypto.versioned_hash(), list(Node.t()), DateTime.t()) :: - {:ok, list(TransactionInput.t())} | {:error, :network_issue} - def fetch_inputs_remotely(_, [], _), do: {:ok, []} + {inputs :: list(TransactionInput.t()), more? :: boolean(), offset :: non_neg_integer()} + def fetch_inputs_remotely(address, nodes, timestamp, offset \\ 0) + def fetch_inputs_remotely(_, [], _, _), do: {[], false, 0} - def fetch_inputs_remotely(address, nodes, timestamp = %DateTime{}) - when is_binary(address) and is_list(nodes) do + def fetch_inputs_remotely(address, nodes, timestamp = %DateTime{}, offset) + when is_binary(address) and is_list(nodes) and is_integer(offset) and offset >= 0 do conflict_resolver = fn results -> results |> Enum.sort_by(&length(&1.inputs), :desc) @@ -742,15 +767,15 @@ defmodule Archethic.TransactionChain do case P2P.quorum_read( nodes, - %GetTransactionInputs{address: address}, + %GetTransactionInputs{address: address, offset: offset}, conflict_resolver ) do - {:ok, %TransactionInputList{inputs: inputs}} -> + {:ok, %TransactionInputList{inputs: inputs, more?: more?, offset: offset}} -> filtered_inputs = Enum.filter(inputs, &(DateTime.diff(&1.timestamp, timestamp) <= 0)) - {:ok, filtered_inputs} + {filtered_inputs, more?, offset} {:error, :network_issue} -> - {:error, :network_issue} + {[], false, 0} end end diff --git a/lib/archethic_web/graphql_schema.ex b/lib/archethic_web/graphql_schema.ex index b9461bab07..abc719cf39 100644 --- a/lib/archethic_web/graphql_schema.ex +++ b/lib/archethic_web/graphql_schema.ex @@ -99,9 +99,16 @@ defmodule ArchethicWeb.GraphQLSchema do """ field :transaction_inputs, list_of(:transaction_input) do arg(:address, non_null(:address)) + arg(:paging_offset, :paging_offset) - resolve(fn %{address: address}, _ -> - Resolver.get_inputs(address) + resolve(fn args = %{address: address}, _ -> + case Map.get(args, :paging_offset) do + nil -> + Resolver.get_inputs(address) + + page -> + Resolver.get_inputs(address, page) + end end) end diff --git a/lib/archethic_web/graphql_schema/page_type.ex b/lib/archethic_web/graphql_schema/page_type.ex index 058acfbdb3..035a8df480 100644 --- a/lib/archethic_web/graphql_schema/page_type.ex +++ b/lib/archethic_web/graphql_schema/page_type.ex @@ -10,8 +10,14 @@ defmodule ArchethicWeb.GraphQLSchema.PageType do parse(&do_parse/1) end - @spec do_parse(Absinthe.Blueprint.Input.Integer.t()) :: - {:ok, integer()} | :error + @desc """ + The [PagingOffset] scalar type represents the paging offset, which is the number of elements to skip + for the next reading + """ + scalar :paging_offset do + parse(&do_parse_offset/1) + end + defp do_parse(%Absinthe.Blueprint.Input.Integer{value: page_value}) when page_value >= 1 do {:ok, page_value} end @@ -19,4 +25,13 @@ defmodule ArchethicWeb.GraphQLSchema.PageType do defp do_parse(_page) do :error end + + defp do_parse_offset(%Absinthe.Blueprint.Input.Integer{value: page_value}) + when page_value >= 0 do + {:ok, page_value} + end + + defp do_parse_offset(_page) do + :error + end end diff --git a/lib/archethic_web/graphql_schema/resolver.ex b/lib/archethic_web/graphql_schema/resolver.ex index e16bcffa16..42d21c7c87 100644 --- a/lib/archethic_web/graphql_schema/resolver.ex +++ b/lib/archethic_web/graphql_schema/resolver.ex @@ -123,13 +123,21 @@ defmodule ArchethicWeb.GraphQLSchema.Resolver do end def get_inputs(address) do - case Archethic.get_transaction_inputs(address) do - {:ok, inputs} -> - {:ok, Enum.map(inputs, &TransactionInput.to_map/1)} + inputs = + address + |> Archethic.get_transaction_inputs() + |> Enum.map(&TransactionInput.to_map/1) - {:error, _} = e -> - e - end + {:ok, inputs} + end + + def get_inputs(address, paging_offset) do + inputs = + address + |> Archethic.get_transaction_inputs(paging_offset) + |> Enum.map(&TransactionInput.to_map/1) + + {:ok, inputs} end def shared_secrets do diff --git a/lib/archethic_web/live/transaction_details_live.ex b/lib/archethic_web/live/transaction_details_live.ex index 489efe33a7..9ab946683f 100644 --- a/lib/archethic_web/live/transaction_details_live.ex +++ b/lib/archethic_web/live/transaction_details_live.ex @@ -83,7 +83,7 @@ defmodule ArchethicWeb.TransactionDetailsLive do previous_address = Transaction.previous_address(tx) with {:ok, balance} <- Archethic.get_balance(address), - {:ok, inputs} <- Archethic.get_transaction_inputs(address) do + inputs <- Archethic.get_transaction_inputs(address) do ledger_inputs = Enum.reject(inputs, &(&1.type == :call)) contract_inputs = Enum.filter(inputs, &(&1.type == :call)) uco_price_at_time = tx.validation_stamp.timestamp |> OracleChain.get_uco_price() @@ -107,22 +107,15 @@ defmodule ArchethicWeb.TransactionDetailsLive do end defp handle_not_existing_transaction(socket, address) do - case Archethic.get_transaction_inputs(address) do - {:ok, inputs} -> - ledger_inputs = Enum.reject(inputs, &(&1.type == :call)) - contract_inputs = Enum.filter(inputs, &(&1.type == :call)) + inputs = Archethic.get_transaction_inputs(address) + ledger_inputs = Enum.reject(inputs, &(&1.type == :call)) + contract_inputs = Enum.filter(inputs, &(&1.type == :call)) - socket - |> assign(:address, address) - |> assign(:inputs, ledger_inputs) - |> assign(:calls, contract_inputs) - |> assign(:error, :not_exists) - - {:error, :network_issue} -> - socket - |> assign(:address, address) - |> assign(:error, :network_issue) - end + socket + |> assign(:address, address) + |> assign(:inputs, ledger_inputs) + |> assign(:calls, contract_inputs) + |> assign(:error, :not_exists) end defp handle_invalid_address(socket, address) do