Skip to content

Commit

Permalink
Support stream of UTXO
Browse files Browse the repository at this point in the history
The aim is to reduce timeouts by streaming into batches  < 3MB.
This would be used to retrieve all the UTXO in the transaction context phase of the mining.
  • Loading branch information
samuelmanzanera committed Sep 14, 2022
1 parent ed1e678 commit b964f8e
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 51 deletions.
27 changes: 7 additions & 20 deletions lib/archethic/mining/transaction_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,11 @@ defmodule Archethic.Mining.TransactionContext do
|> Enum.uniq()

prev_tx_task = request_previous_tx(previous_address, prev_tx_nodes_split)
utxo_task = request_utxo(previous_address, unspent_outputs_nodes_split)
nodes_view_task = request_nodes_view(node_public_keys)

prev_tx = Task.await(prev_tx_task, Message.get_max_timeout())
utxos = Task.await(utxo_task)
nodes_view = Task.await(nodes_view_task)

# involved_nodes =
# [prev_tx_node_involved, utxo_node_involved]
# |> Enum.filter(& &1)
# |> P2P.distinct_nodes()

%{
chain_nodes_view: chain_storage_nodes_view,
beacon_nodes_view: beacon_storage_nodes_view,
Expand All @@ -83,7 +76,13 @@ defmodule Archethic.Mining.TransactionContext do
io_storage_node_public_keys
)

# TODO: remove the invovled nodes as not used anymore
utxos =
TransactionChain.stream_unspent_outputs_remotely(
previous_address,
unspent_outputs_nodes_split
)
|> Enum.to_list()

{prev_tx, utxos, [], chain_storage_nodes_view, beacon_storage_nodes_view,
io_storage_nodes_view}
end
Expand Down Expand Up @@ -117,18 +116,6 @@ defmodule Archethic.Mining.TransactionContext do
end)
end

defp request_utxo(previous_address, nodes) do
Task.Supervisor.async(TaskSupervisor, fn ->
{:ok, utxos} =
TransactionChain.fetch_unspent_outputs_remotely(
previous_address,
nodes
)

utxos
end)
end

defp request_nodes_view(node_public_keys) do
Task.Supervisor.async(TaskSupervisor, fn ->
Task.Supervisor.async_stream_nolink(
Expand Down
66 changes: 52 additions & 14 deletions lib/archethic/p2p/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,6 @@ defmodule Archethic.P2P.Message do
@spec get_timeout(__MODULE__.t()) :: non_neg_integer()
def get_timeout(%GetTransaction{}), do: get_max_timeout()
def get_timeout(%GetLastTransaction{}), do: get_max_timeout()
def get_timeout(%NewTransaction{}), do: get_max_timeout()
def get_timeout(%StartMining{}), do: get_max_timeout()
def get_timeout(%ReplicateTransaction{}), do: get_max_timeout()
def get_timeout(%ReplicateTransactionChain{}), do: get_max_timeout()

def get_timeout(%GetTransactionChain{}) do
# As we use 10 transaction in the pagination we can estimate the max time
Expand All @@ -191,7 +187,6 @@ defmodule Archethic.P2P.Message do
# length(addresses) * trunc(beacon_summary_high_estimation_bytes / @floor_upload_speed * 1000)
# end

def get_timeout(%GetUnspentOutputs{}), do: get_max_timeout()
def get_timeout(%GetTransactionInputs{}), do: get_max_timeout()

def get_timeout(_), do: 3_000
Expand Down Expand Up @@ -249,8 +244,8 @@ defmodule Archethic.P2P.Message do
<<4::8, tx_address::binary, byte_size(paging_state)::8, paging_state::binary>>
end

def encode(%GetUnspentOutputs{address: tx_address}) do
<<5::8, tx_address::binary>>
def encode(%GetUnspentOutputs{address: tx_address, offset: offset}) do
<<5::8, tx_address::binary, VarInt.from_value(offset)::binary>>
end

def encode(%NewTransaction{transaction: tx}) do
Expand Down Expand Up @@ -560,7 +555,7 @@ defmodule Archethic.P2P.Message do
<<249::8, encoded_nodes_length::binary, nodes_bin::bitstring>>
end

def encode(%UnspentOutputList{unspent_outputs: unspent_outputs}) do
def encode(%UnspentOutputList{unspent_outputs: unspent_outputs, more?: more?, offset: offset}) do
unspent_outputs_bin =
unspent_outputs
|> Stream.map(&UnspentOutput.serialize/1)
Expand All @@ -569,7 +564,10 @@ defmodule Archethic.P2P.Message do

encoded_unspent_outputs_length = Enum.count(unspent_outputs) |> VarInt.from_value()

<<250::8, encoded_unspent_outputs_length::binary, unspent_outputs_bin::binary>>
more_bit = if more?, do: 1, else: 0

<<250::8, encoded_unspent_outputs_length::binary, unspent_outputs_bin::binary, more_bit::1,
VarInt.from_value(offset)::binary>>
end

def encode(%TransactionList{transactions: transactions, more?: false}) do
Expand Down Expand Up @@ -667,7 +665,9 @@ defmodule Archethic.P2P.Message do

def decode(<<5::8, rest::bitstring>>) do
{address, rest} = Utils.deserialize_address(rest)
{%GetUnspentOutputs{address: address}, rest}

{offset, rest} = VarInt.get_value(rest)
{%GetUnspentOutputs{address: address, offset: offset}, rest}
end

def decode(<<6::8, rest::bitstring>>) do
Expand Down Expand Up @@ -1086,8 +1086,15 @@ defmodule Archethic.P2P.Message do

def decode(<<250::8, rest::bitstring>>) do
{nb_unspent_outputs, rest} = rest |> VarInt.get_value()
{unspent_outputs, rest} = deserialize_unspent_output_list(rest, nb_unspent_outputs, [])
{%UnspentOutputList{unspent_outputs: unspent_outputs}, rest}

{unspent_outputs, <<more_bit::1, rest::bitstring>>} =
deserialize_unspent_output_list(rest, nb_unspent_outputs, [])

more? = more_bit == 1

{offset, rest} = VarInt.get_value(rest)

{%UnspentOutputList{unspent_outputs: unspent_outputs, more?: more?, offset: offset}, rest}
end

def decode(<<251::8, rest::bitstring>>) do
Expand Down Expand Up @@ -1276,9 +1283,40 @@ defmodule Archethic.P2P.Message do
%TransactionList{transactions: chain, paging_state: paging_state, more?: more?}
end

def process(%GetUnspentOutputs{address: tx_address}) do
def process(%GetUnspentOutputs{address: tx_address, offset: offset}) do
utxos = Account.get_unspent_outputs(tx_address)
utxos_length = length(utxos)

%{utxos: utxos, offset: offset, more?: more?} =
utxos
|> Enum.with_index()
|> Enum.drop(offset)
|> Enum.reduce_while(%{utxos: [], offset: 0, more?: false}, fn {utxo, index}, acc ->
acc_size =
acc.utxos
|> Enum.map(&UnspentOutput.serialize/1)
|> :erlang.list_to_binary()
|> byte_size()

utxo_size = UnspentOutput.serialize(utxo) |> byte_size

if acc_size + utxo_size < 3_000_000 do
new_acc =
acc
|> Map.update!(:utxos, &[utxo | &1])
|> Map.put(:offset, index + 1)
|> Map.put(:more?, index + 1 < utxos_length)

{:cont, new_acc}
else
{:halt, acc}
end
end)

%UnspentOutputList{
unspent_outputs: Account.get_unspent_outputs(tx_address)
unspent_outputs: utxos,
offset: offset,
more?: more?
}
end

Expand Down
5 changes: 3 additions & 2 deletions lib/archethic/p2p/message/get_unspent_outputs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ defmodule Archethic.P2P.Message.GetUnspentOutputs do
Represents a message to request the list of unspent outputs from a transaction
"""
@enforce_keys [:address]
defstruct [:address]
defstruct [:address, offset: 0]

alias Archethic.Crypto

@type t :: %__MODULE__{
address: Crypto.versioned_hash()
address: Crypto.versioned_hash(),
offset: non_neg_integer()
}
end
6 changes: 4 additions & 2 deletions lib/archethic/p2p/message/unspent_output_list.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ defmodule Archethic.P2P.Message.UnspentOutputList do
@moduledoc """
Represents a message with a list of unspent outputs
"""
defstruct unspent_outputs: []
defstruct unspent_outputs: [], more?: false, offset: 0

alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.UnspentOutput

@type t :: %__MODULE__{
unspent_outputs: list(UnspentOutput.t())
unspent_outputs: list(UnspentOutput.t()),
more?: boolean(),
offset: non_neg_integer()
}
end
44 changes: 31 additions & 13 deletions lib/archethic/transaction_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -755,20 +755,38 @@ defmodule Archethic.TransactionChain do
end

@doc """
Fetch the transaction unspent outputs for a transaction address at a given time
If the utxo exist, then they are returned in the shape of `{:ok, inputs}`.
If no nodes are able to answer the request, `{:error, :network_issue}` is returned.
Stream the transaction unspent outputs for a transaction address
"""
@spec fetch_unspent_outputs_remotely(
@spec stream_unspent_outputs_remotely(
address :: Crypto.versioned_hash(),
list(Node.t())
) ::
{:ok, list(UnspentOutput.t())} | {:error, :network_issue}
def fetch_unspent_outputs_remotely(_, []), do: {:ok, []}
) :: Enumerable.t() | list(UnspentOutput.t())
def stream_unspent_outputs_remotely(_, []), do: []

def fetch_unspent_outputs_remotely(address, nodes)
def stream_unspent_outputs_remotely(address, nodes)
when is_binary(address) and is_list(nodes) do
Stream.resource(
fn -> fetch_unspent_outputs_remotely(address, nodes, 0) end,
fn
{utxos, true, offset} ->
{utxos, fetch_unspent_outputs_remotely(address, nodes, offset)}

{utxos, false, _} ->
{utxos, :eof}

:eof ->
{:halt, nil}
end,
fn _ -> :ok end
)
end

@doc """
Fetch the unspent outputs
"""
@spec fetch_unspent_outputs_remotely(binary(), list(Node.t()), non_neg_integer()) ::
{list(UnspentOutput.t()), boolean(), non_neg_integer() | nil}
def fetch_unspent_outputs_remotely(address, nodes, offset) do
conflict_resolver = fn results ->
results
|> Enum.sort_by(&length(&1.unspent_outputs), :desc)
Expand All @@ -777,14 +795,14 @@ defmodule Archethic.TransactionChain do

case P2P.quorum_read(
nodes,
%GetUnspentOutputs{address: address},
%GetUnspentOutputs{address: address, offset: offset},
conflict_resolver
) do
{:ok, %UnspentOutputList{unspent_outputs: unspent_outputs}} ->
{:ok, unspent_outputs}
{:ok, %UnspentOutputList{unspent_outputs: unspent_outputs, more?: more?, offset: offset}} ->
{unspent_outputs, more?, offset}

{:error, :network_issue} ->
{:error, :network_issue}
{[], false, nil}
end
end

Expand Down

0 comments on commit b964f8e

Please sign in to comment.