From b964f8ed1932a2db5ff6e41b10da67d36faad9f8 Mon Sep 17 00:00:00 2001 From: Samuel Manzanera Date: Wed, 14 Sep 2022 11:49:02 +0200 Subject: [PATCH] Support stream of UTXO The aim is to reduce timeouts by streaming into batches < 3MB. This would be used to retrieve all the UTXO in the transaction context phase of the mining. --- lib/archethic/mining/transaction_context.ex | 27 ++------ lib/archethic/p2p/message.ex | 66 +++++++++++++++---- .../p2p/message/get_unspent_outputs.ex | 5 +- .../p2p/message/unspent_output_list.ex | 6 +- lib/archethic/transaction_chain.ex | 44 +++++++++---- 5 files changed, 97 insertions(+), 51 deletions(-) diff --git a/lib/archethic/mining/transaction_context.ex b/lib/archethic/mining/transaction_context.ex index 4be4318748..2895e5405c 100644 --- a/lib/archethic/mining/transaction_context.ex +++ b/lib/archethic/mining/transaction_context.ex @@ -59,18 +59,11 @@ defmodule Archethic.Mining.TransactionContext do |> Enum.uniq() prev_tx_task = request_previous_tx(previous_address, prev_tx_nodes_split) - utxo_task = request_utxo(previous_address, unspent_outputs_nodes_split) nodes_view_task = request_nodes_view(node_public_keys) prev_tx = Task.await(prev_tx_task, Message.get_max_timeout()) - utxos = Task.await(utxo_task) nodes_view = Task.await(nodes_view_task) - # involved_nodes = - # [prev_tx_node_involved, utxo_node_involved] - # |> Enum.filter(& &1) - # |> P2P.distinct_nodes() - %{ chain_nodes_view: chain_storage_nodes_view, beacon_nodes_view: beacon_storage_nodes_view, @@ -83,7 +76,13 @@ defmodule Archethic.Mining.TransactionContext do io_storage_node_public_keys ) - # TODO: remove the invovled nodes as not used anymore + utxos = + TransactionChain.stream_unspent_outputs_remotely( + previous_address, + unspent_outputs_nodes_split + ) + |> Enum.to_list() + {prev_tx, utxos, [], chain_storage_nodes_view, beacon_storage_nodes_view, io_storage_nodes_view} end @@ -117,18 +116,6 @@ defmodule Archethic.Mining.TransactionContext do end) end - defp request_utxo(previous_address, nodes) do - Task.Supervisor.async(TaskSupervisor, fn -> - {:ok, utxos} = - TransactionChain.fetch_unspent_outputs_remotely( - previous_address, - nodes - ) - - utxos - end) - end - defp request_nodes_view(node_public_keys) do Task.Supervisor.async(TaskSupervisor, fn -> Task.Supervisor.async_stream_nolink( diff --git a/lib/archethic/p2p/message.ex b/lib/archethic/p2p/message.ex index e4ba57360a..43ae7ae2c6 100644 --- a/lib/archethic/p2p/message.ex +++ b/lib/archethic/p2p/message.ex @@ -174,10 +174,6 @@ defmodule Archethic.P2P.Message do @spec get_timeout(__MODULE__.t()) :: non_neg_integer() def get_timeout(%GetTransaction{}), do: get_max_timeout() def get_timeout(%GetLastTransaction{}), do: get_max_timeout() - def get_timeout(%NewTransaction{}), do: get_max_timeout() - def get_timeout(%StartMining{}), do: get_max_timeout() - def get_timeout(%ReplicateTransaction{}), do: get_max_timeout() - def get_timeout(%ReplicateTransactionChain{}), do: get_max_timeout() def get_timeout(%GetTransactionChain{}) do # As we use 10 transaction in the pagination we can estimate the max time @@ -191,7 +187,6 @@ defmodule Archethic.P2P.Message do # length(addresses) * trunc(beacon_summary_high_estimation_bytes / @floor_upload_speed * 1000) # end - def get_timeout(%GetUnspentOutputs{}), do: get_max_timeout() def get_timeout(%GetTransactionInputs{}), do: get_max_timeout() def get_timeout(_), do: 3_000 @@ -249,8 +244,8 @@ defmodule Archethic.P2P.Message do <<4::8, tx_address::binary, byte_size(paging_state)::8, paging_state::binary>> end - def encode(%GetUnspentOutputs{address: tx_address}) do - <<5::8, tx_address::binary>> + def encode(%GetUnspentOutputs{address: tx_address, offset: offset}) do + <<5::8, tx_address::binary, VarInt.from_value(offset)::binary>> end def encode(%NewTransaction{transaction: tx}) do @@ -560,7 +555,7 @@ defmodule Archethic.P2P.Message do <<249::8, encoded_nodes_length::binary, nodes_bin::bitstring>> end - def encode(%UnspentOutputList{unspent_outputs: unspent_outputs}) do + def encode(%UnspentOutputList{unspent_outputs: unspent_outputs, more?: more?, offset: offset}) do unspent_outputs_bin = unspent_outputs |> Stream.map(&UnspentOutput.serialize/1) @@ -569,7 +564,10 @@ defmodule Archethic.P2P.Message do encoded_unspent_outputs_length = Enum.count(unspent_outputs) |> VarInt.from_value() - <<250::8, encoded_unspent_outputs_length::binary, unspent_outputs_bin::binary>> + more_bit = if more?, do: 1, else: 0 + + <<250::8, encoded_unspent_outputs_length::binary, unspent_outputs_bin::binary, more_bit::1, + VarInt.from_value(offset)::binary>> end def encode(%TransactionList{transactions: transactions, more?: false}) do @@ -667,7 +665,9 @@ defmodule Archethic.P2P.Message do def decode(<<5::8, rest::bitstring>>) do {address, rest} = Utils.deserialize_address(rest) - {%GetUnspentOutputs{address: address}, rest} + + {offset, rest} = VarInt.get_value(rest) + {%GetUnspentOutputs{address: address, offset: offset}, rest} end def decode(<<6::8, rest::bitstring>>) do @@ -1086,8 +1086,15 @@ defmodule Archethic.P2P.Message do def decode(<<250::8, rest::bitstring>>) do {nb_unspent_outputs, rest} = rest |> VarInt.get_value() - {unspent_outputs, rest} = deserialize_unspent_output_list(rest, nb_unspent_outputs, []) - {%UnspentOutputList{unspent_outputs: unspent_outputs}, rest} + + {unspent_outputs, <>} = + deserialize_unspent_output_list(rest, nb_unspent_outputs, []) + + more? = more_bit == 1 + + {offset, rest} = VarInt.get_value(rest) + + {%UnspentOutputList{unspent_outputs: unspent_outputs, more?: more?, offset: offset}, rest} end def decode(<<251::8, rest::bitstring>>) do @@ -1276,9 +1283,40 @@ defmodule Archethic.P2P.Message do %TransactionList{transactions: chain, paging_state: paging_state, more?: more?} end - def process(%GetUnspentOutputs{address: tx_address}) do + def process(%GetUnspentOutputs{address: tx_address, offset: offset}) do + utxos = Account.get_unspent_outputs(tx_address) + utxos_length = length(utxos) + + %{utxos: utxos, offset: offset, more?: more?} = + utxos + |> Enum.with_index() + |> Enum.drop(offset) + |> Enum.reduce_while(%{utxos: [], offset: 0, more?: false}, fn {utxo, index}, acc -> + acc_size = + acc.utxos + |> Enum.map(&UnspentOutput.serialize/1) + |> :erlang.list_to_binary() + |> byte_size() + + utxo_size = UnspentOutput.serialize(utxo) |> byte_size + + if acc_size + utxo_size < 3_000_000 do + new_acc = + acc + |> Map.update!(:utxos, &[utxo | &1]) + |> Map.put(:offset, index + 1) + |> Map.put(:more?, index + 1 < utxos_length) + + {:cont, new_acc} + else + {:halt, acc} + end + end) + %UnspentOutputList{ - unspent_outputs: Account.get_unspent_outputs(tx_address) + unspent_outputs: utxos, + offset: offset, + more?: more? } end diff --git a/lib/archethic/p2p/message/get_unspent_outputs.ex b/lib/archethic/p2p/message/get_unspent_outputs.ex index 3b87a8df70..9bf3485574 100644 --- a/lib/archethic/p2p/message/get_unspent_outputs.ex +++ b/lib/archethic/p2p/message/get_unspent_outputs.ex @@ -3,11 +3,12 @@ defmodule Archethic.P2P.Message.GetUnspentOutputs do Represents a message to request the list of unspent outputs from a transaction """ @enforce_keys [:address] - defstruct [:address] + defstruct [:address, offset: 0] alias Archethic.Crypto @type t :: %__MODULE__{ - address: Crypto.versioned_hash() + address: Crypto.versioned_hash(), + offset: non_neg_integer() } end diff --git a/lib/archethic/p2p/message/unspent_output_list.ex b/lib/archethic/p2p/message/unspent_output_list.ex index a41df627f6..f769814223 100644 --- a/lib/archethic/p2p/message/unspent_output_list.ex +++ b/lib/archethic/p2p/message/unspent_output_list.ex @@ -2,11 +2,13 @@ defmodule Archethic.P2P.Message.UnspentOutputList do @moduledoc """ Represents a message with a list of unspent outputs """ - defstruct unspent_outputs: [] + defstruct unspent_outputs: [], more?: false, offset: 0 alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.UnspentOutput @type t :: %__MODULE__{ - unspent_outputs: list(UnspentOutput.t()) + unspent_outputs: list(UnspentOutput.t()), + more?: boolean(), + offset: non_neg_integer() } end diff --git a/lib/archethic/transaction_chain.ex b/lib/archethic/transaction_chain.ex index 7fd2d72610..38f8d4b2ac 100644 --- a/lib/archethic/transaction_chain.ex +++ b/lib/archethic/transaction_chain.ex @@ -755,20 +755,38 @@ defmodule Archethic.TransactionChain do end @doc """ - Fetch the transaction unspent outputs for a transaction address at a given time - - If the utxo exist, then they are returned in the shape of `{:ok, inputs}`. - If no nodes are able to answer the request, `{:error, :network_issue}` is returned. + Stream the transaction unspent outputs for a transaction address """ - @spec fetch_unspent_outputs_remotely( + @spec stream_unspent_outputs_remotely( address :: Crypto.versioned_hash(), list(Node.t()) - ) :: - {:ok, list(UnspentOutput.t())} | {:error, :network_issue} - def fetch_unspent_outputs_remotely(_, []), do: {:ok, []} + ) :: Enumerable.t() | list(UnspentOutput.t()) + def stream_unspent_outputs_remotely(_, []), do: [] - def fetch_unspent_outputs_remotely(address, nodes) + def stream_unspent_outputs_remotely(address, nodes) when is_binary(address) and is_list(nodes) do + Stream.resource( + fn -> fetch_unspent_outputs_remotely(address, nodes, 0) end, + fn + {utxos, true, offset} -> + {utxos, fetch_unspent_outputs_remotely(address, nodes, offset)} + + {utxos, false, _} -> + {utxos, :eof} + + :eof -> + {:halt, nil} + end, + fn _ -> :ok end + ) + end + + @doc """ + Fetch the unspent outputs + """ + @spec fetch_unspent_outputs_remotely(binary(), list(Node.t()), non_neg_integer()) :: + {list(UnspentOutput.t()), boolean(), non_neg_integer() | nil} + def fetch_unspent_outputs_remotely(address, nodes, offset) do conflict_resolver = fn results -> results |> Enum.sort_by(&length(&1.unspent_outputs), :desc) @@ -777,14 +795,14 @@ defmodule Archethic.TransactionChain do case P2P.quorum_read( nodes, - %GetUnspentOutputs{address: address}, + %GetUnspentOutputs{address: address, offset: offset}, conflict_resolver ) do - {:ok, %UnspentOutputList{unspent_outputs: unspent_outputs}} -> - {:ok, unspent_outputs} + {:ok, %UnspentOutputList{unspent_outputs: unspent_outputs, more?: more?, offset: offset}} -> + {unspent_outputs, more?, offset} {:error, :network_issue} -> - {:error, :network_issue} + {[], false, nil} end end