Skip to content

Commit

Permalink
Support UTXO & Tx Inputs streaming (#570)
Browse files Browse the repository at this point in the history
* Support stream of UTXO

The aim is to reduce timeouts by streaming into batches  < 3MB.
This would be used to retrieve all the UTXO in the transaction context phase of the mining.

* Support stream of tx inputs

It provides filtering to limit the number of inputs to return in graphql api while sorting them in descending order
  • Loading branch information
samuelmanzanera committed Sep 16, 2022
1 parent 627b203 commit c60dc07
Show file tree
Hide file tree
Showing 18 changed files with 324 additions and 136 deletions.
30 changes: 27 additions & 3 deletions lib/archethic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,40 @@ defmodule Archethic do
@doc """
Request to fetch the inputs for a transaction address from the closest nodes
"""
@spec get_transaction_inputs(binary()) ::
{:ok, list(TransactionInput.t())} | {:error, :network_issue}
@spec get_transaction_inputs(binary()) :: list(TransactionInput.t())
def get_transaction_inputs(address) when is_binary(address) do
nodes =
address
|> Election.chain_storage_nodes(P2P.authorized_and_available_nodes())
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)

TransactionChain.fetch_inputs_remotely(address, nodes, DateTime.utc_now())
address
|> TransactionChain.stream_inputs_remotely(nodes, DateTime.utc_now())
|> Enum.to_list()
end

@doc """
Request to fetch the inputs for a transaction address from the closest nodes at a given page
"""
@spec get_transaction_inputs(
binary(),
paging_offset :: non_neg_integer(),
limit :: non_neg_integer()
) :: list(TransactionInput.t())
def get_transaction_inputs(address, page, limit)
when is_binary(address) and is_integer(page) and page >= 0 and is_integer(limit) and
limit >= 0 do
nodes =
address
|> Election.chain_storage_nodes(P2P.authorized_and_available_nodes())
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)

{inputs, _more?, _offset} =
TransactionChain.fetch_inputs_remotely(address, nodes, DateTime.utc_now(), page, limit)

inputs
end

@doc """
Expand Down
27 changes: 7 additions & 20 deletions lib/archethic/mining/transaction_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,11 @@ defmodule Archethic.Mining.TransactionContext do
|> Enum.uniq()

prev_tx_task = request_previous_tx(previous_address, prev_tx_nodes_split)
utxo_task = request_utxo(previous_address, unspent_outputs_nodes_split)
nodes_view_task = request_nodes_view(node_public_keys)

prev_tx = Task.await(prev_tx_task, Message.get_max_timeout())
utxos = Task.await(utxo_task)
nodes_view = Task.await(nodes_view_task)

# involved_nodes =
# [prev_tx_node_involved, utxo_node_involved]
# |> Enum.filter(& &1)
# |> P2P.distinct_nodes()

%{
chain_nodes_view: chain_storage_nodes_view,
beacon_nodes_view: beacon_storage_nodes_view,
Expand All @@ -83,7 +76,13 @@ defmodule Archethic.Mining.TransactionContext do
io_storage_node_public_keys
)

# TODO: remove the invovled nodes as not used anymore
utxos =
TransactionChain.stream_unspent_outputs_remotely(
previous_address,
unspent_outputs_nodes_split
)
|> Enum.to_list()

{prev_tx, utxos, [], chain_storage_nodes_view, beacon_storage_nodes_view,
io_storage_nodes_view}
end
Expand Down Expand Up @@ -117,18 +116,6 @@ defmodule Archethic.Mining.TransactionContext do
end)
end

defp request_utxo(previous_address, nodes) do
Task.Supervisor.async(TaskSupervisor, fn ->
{:ok, utxos} =
TransactionChain.fetch_unspent_outputs_remotely(
previous_address,
nodes
)

utxos
end)
end

defp request_nodes_view(node_public_keys) do
Task.Supervisor.async(TaskSupervisor, fn ->
Task.Supervisor.async_stream_nolink(
Expand Down
2 changes: 1 addition & 1 deletion lib/archethic/networking/ip_lookup.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ defmodule Archethic.Networking.IPLookup do
ip

{:error, reason} ->
fallback(RemoteDiscovery, reason)
raise "Cannot use remote discovery IP lookup - #{inspect(reason)}"
end
end

Expand Down
138 changes: 117 additions & 21 deletions lib/archethic/p2p/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,6 @@ defmodule Archethic.P2P.Message do
# length(addresses) * trunc(beacon_summary_high_estimation_bytes / @floor_upload_speed * 1000)
# end

def get_timeout(%GetUnspentOutputs{}), do: get_max_timeout()
def get_timeout(%GetTransactionInputs{}), do: get_max_timeout()

def get_timeout(_), do: 3_000

@doc """
Expand Down Expand Up @@ -249,8 +246,8 @@ defmodule Archethic.P2P.Message do
<<4::8, tx_address::binary, byte_size(paging_state)::8, paging_state::binary>>
end

def encode(%GetUnspentOutputs{address: tx_address}) do
<<5::8, tx_address::binary>>
def encode(%GetUnspentOutputs{address: tx_address, offset: offset}) do
<<5::8, tx_address::binary, VarInt.from_value(offset)::binary>>
end

def encode(%NewTransaction{transaction: tx}) do
Expand Down Expand Up @@ -351,8 +348,9 @@ defmodule Archethic.P2P.Message do
<<16::8, address::binary>>
end

def encode(%GetTransactionInputs{address: address}) do
<<17::8, address::binary>>
def encode(%GetTransactionInputs{address: address, offset: offset, limit: limit}) do
<<17::8, address::binary, VarInt.from_value(offset)::binary,
VarInt.from_value(limit)::binary>>
end

def encode(%GetTransactionChainLength{address: address}) do
Expand Down Expand Up @@ -494,7 +492,7 @@ defmodule Archethic.P2P.Message do
<<243::8, bit_size(view)::8, view::bitstring>>
end

def encode(%TransactionInputList{inputs: inputs}) do
def encode(%TransactionInputList{inputs: inputs, more?: more?, offset: offset}) do
inputs_bin =
inputs
|> Stream.map(&TransactionInput.serialize/1)
Expand All @@ -503,7 +501,10 @@ defmodule Archethic.P2P.Message do

encoded_inputs_length = length(inputs) |> VarInt.from_value()

<<244::8, encoded_inputs_length::binary, inputs_bin::bitstring>>
more_bit = if more?, do: 1, else: 0

<<244::8, encoded_inputs_length::binary, inputs_bin::bitstring, more_bit::1,
VarInt.from_value(offset)::binary>>
end

def encode(%TransactionChainLength{length: length}) do
Expand Down Expand Up @@ -560,7 +561,7 @@ defmodule Archethic.P2P.Message do
<<249::8, encoded_nodes_length::binary, nodes_bin::bitstring>>
end

def encode(%UnspentOutputList{unspent_outputs: unspent_outputs}) do
def encode(%UnspentOutputList{unspent_outputs: unspent_outputs, more?: more?, offset: offset}) do
unspent_outputs_bin =
unspent_outputs
|> Stream.map(&UnspentOutput.serialize/1)
Expand All @@ -569,7 +570,10 @@ defmodule Archethic.P2P.Message do

encoded_unspent_outputs_length = Enum.count(unspent_outputs) |> VarInt.from_value()

<<250::8, encoded_unspent_outputs_length::binary, unspent_outputs_bin::binary>>
more_bit = if more?, do: 1, else: 0

<<250::8, encoded_unspent_outputs_length::binary, unspent_outputs_bin::binary, more_bit::1,
VarInt.from_value(offset)::binary>>
end

def encode(%TransactionList{transactions: transactions, more?: false}) do
Expand Down Expand Up @@ -667,7 +671,9 @@ defmodule Archethic.P2P.Message do

def decode(<<5::8, rest::bitstring>>) do
{address, rest} = Utils.deserialize_address(rest)
{%GetUnspentOutputs{address: address}, rest}

{offset, rest} = VarInt.get_value(rest)
{%GetUnspentOutputs{address: address, offset: offset}, rest}
end

def decode(<<6::8, rest::bitstring>>) do
Expand Down Expand Up @@ -823,7 +829,9 @@ defmodule Archethic.P2P.Message do

def decode(<<17::8, rest::bitstring>>) do
{address, rest} = Utils.deserialize_address(rest)
{%GetTransactionInputs{address: address}, rest}
{offset, rest} = VarInt.get_value(rest)
{limit, rest} = VarInt.get_value(rest)
{%GetTransactionInputs{address: address, offset: offset, limit: limit}, rest}
end

def decode(<<18::8, rest::bitstring>>) do
Expand Down Expand Up @@ -1034,10 +1042,18 @@ defmodule Archethic.P2P.Message do

def decode(<<244::8, rest::bitstring>>) do
{nb_inputs, rest} = rest |> VarInt.get_value()
{inputs, rest} = deserialize_transaction_inputs(rest, nb_inputs, [])

{inputs, <<more_bit::1, rest::bitstring>>} =
deserialize_transaction_inputs(rest, nb_inputs, [])

more? = more_bit == 1

{offset, rest} = VarInt.get_value(rest)

{%TransactionInputList{
inputs: inputs
inputs: inputs,
more?: more?,
offset: offset
}, rest}
end

Expand Down Expand Up @@ -1086,8 +1102,15 @@ defmodule Archethic.P2P.Message do

def decode(<<250::8, rest::bitstring>>) do
{nb_unspent_outputs, rest} = rest |> VarInt.get_value()
{unspent_outputs, rest} = deserialize_unspent_output_list(rest, nb_unspent_outputs, [])
{%UnspentOutputList{unspent_outputs: unspent_outputs}, rest}

{unspent_outputs, <<more_bit::1, rest::bitstring>>} =
deserialize_unspent_output_list(rest, nb_unspent_outputs, [])

more? = more_bit == 1

{offset, rest} = VarInt.get_value(rest)

{%UnspentOutputList{unspent_outputs: unspent_outputs, more?: more?, offset: offset}, rest}
end

def decode(<<251::8, rest::bitstring>>) do
Expand Down Expand Up @@ -1276,9 +1299,41 @@ defmodule Archethic.P2P.Message do
%TransactionList{transactions: chain, paging_state: paging_state, more?: more?}
end

def process(%GetUnspentOutputs{address: tx_address}) do
def process(%GetUnspentOutputs{address: tx_address, offset: offset}) do
utxos = Account.get_unspent_outputs(tx_address)
utxos_length = length(utxos)

%{utxos: utxos, offset: offset, more?: more?} =
utxos
|> Enum.sort_by(& &1.timestamp, {:desc, DateTime})
|> Enum.with_index()
|> Enum.drop(offset)
|> Enum.reduce_while(%{utxos: [], offset: 0, more?: false}, fn {utxo, index}, acc ->
acc_size =
acc.utxos
|> Enum.map(&UnspentOutput.serialize/1)
|> :erlang.list_to_binary()
|> byte_size()

utxo_size = UnspentOutput.serialize(utxo) |> byte_size

if acc_size + utxo_size < 3_000_000 do
new_acc =
acc
|> Map.update!(:utxos, &[utxo | &1])
|> Map.put(:offset, index + 1)
|> Map.put(:more?, index + 1 < utxos_length)

{:cont, new_acc}
else
{:halt, acc}
end
end)

%UnspentOutputList{
unspent_outputs: Account.get_unspent_outputs(tx_address)
unspent_outputs: utxos,
offset: offset,
more?: more?
}
end

Expand Down Expand Up @@ -1447,16 +1502,57 @@ defmodule Archethic.P2P.Message do
}
end

def process(%GetTransactionInputs{address: address}) do
def process(%GetTransactionInputs{address: address, offset: offset, limit: limit}) do
contract_inputs =
address
|> Contracts.list_contract_transactions()
|> Enum.map(fn {address, timestamp} ->
%TransactionInput{from: address, type: :call, timestamp: timestamp}
end)

inputs = Account.get_inputs(address) ++ contract_inputs
inputs_length = length(inputs)

%{inputs: inputs, offset: offset, more?: more?} =
inputs
|> Enum.sort_by(& &1.timestamp, {:desc, DateTime})
|> Enum.with_index()
|> Enum.drop(offset)
|> Enum.reduce_while(%{inputs: [], offset: 0, more?: false}, fn {input, index}, acc ->
acc_size =
acc.inputs
|> Enum.map(&TransactionInput.serialize/1)
|> :erlang.list_to_bitstring()
|> byte_size()

input_size = TransactionInput.serialize(input) |> byte_size

size_capacity? = acc_size + input_size < 3_000_000

should_take_more? =
if limit > 0 do
length(acc.inputs) < limit and size_capacity?
else
size_capacity?
end

if should_take_more? do
new_acc =
acc
|> Map.update!(:inputs, &[input | &1])
|> Map.put(:offset, index + 1)
|> Map.put(:more?, index + 1 < inputs_length)

{:cont, new_acc}
else
{:halt, acc}
end
end)

%TransactionInputList{
inputs: Account.get_inputs(address) ++ contract_inputs
inputs: Enum.reverse(inputs),
more?: more?,
offset: offset
}
end

Expand Down
6 changes: 4 additions & 2 deletions lib/archethic/p2p/message/get_transaction_inputs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ defmodule Archethic.P2P.Message.GetTransactionInputs do
Represents a message with to request the inputs (spent or unspents) from a transaction
"""
@enforce_keys [:address]
defstruct [:address]
defstruct [:address, offset: 0, limit: 0]

alias Archethic.Crypto

@type t :: %__MODULE__{
address: Crypto.versioned_hash()
address: Crypto.versioned_hash(),
offset: non_neg_integer(),
limit: non_neg_integer()
}
end
5 changes: 3 additions & 2 deletions lib/archethic/p2p/message/get_unspent_outputs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ defmodule Archethic.P2P.Message.GetUnspentOutputs do
Represents a message to request the list of unspent outputs from a transaction
"""
@enforce_keys [:address]
defstruct [:address]
defstruct [:address, offset: 0]

alias Archethic.Crypto

@type t :: %__MODULE__{
address: Crypto.versioned_hash()
address: Crypto.versioned_hash(),
offset: non_neg_integer()
}
end
6 changes: 4 additions & 2 deletions lib/archethic/p2p/message/transaction_input_list.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ defmodule Archethic.P2P.Message.TransactionInputList do
@moduledoc """
Represents a message with a list of transaction inputs
"""
defstruct inputs: []
defstruct inputs: [], more?: false, offset: 0

alias Archethic.TransactionChain.TransactionInput

@type t() :: %__MODULE__{
inputs: list(TransactionInput.t())
inputs: list(TransactionInput.t()),
more?: boolean(),
offset: non_neg_integer()
}
end
Loading

0 comments on commit c60dc07

Please sign in to comment.