Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support UTXO & Tx Inputs streaming #570

Merged
merged 6 commits into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions lib/archethic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,35 @@ defmodule Archethic do
@doc """
Request to fetch the inputs for a transaction address from the closest nodes
"""
@spec get_transaction_inputs(binary()) ::
{:ok, list(TransactionInput.t())} | {:error, :network_issue}
@spec get_transaction_inputs(binary()) :: list(TransactionInput.t())
def get_transaction_inputs(address) when is_binary(address) do
nodes =
address
|> Election.chain_storage_nodes(P2P.authorized_and_available_nodes())
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)

TransactionChain.fetch_inputs_remotely(address, nodes, DateTime.utc_now())
address
|> TransactionChain.stream_inputs_remotely(nodes, DateTime.utc_now())
|> Enum.to_list()
end

@doc """
Request to fetch the inputs for a transaction address from the closest nodes at a given page
"""
@spec get_transaction_inputs(binary(), non_neg_integer()) :: list(TransactionInput.t())
def get_transaction_inputs(address, page)
when is_binary(address) and is_integer(page) and page >= 0 do
nodes =
address
|> Election.chain_storage_nodes(P2P.authorized_and_available_nodes())
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)

{inputs, _more?, _offset} =
TransactionChain.fetch_inputs_remotely(address, nodes, DateTime.utc_now(), page)

inputs
end

@doc """
Expand Down
27 changes: 7 additions & 20 deletions lib/archethic/mining/transaction_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,11 @@ defmodule Archethic.Mining.TransactionContext do
|> Enum.uniq()

prev_tx_task = request_previous_tx(previous_address, prev_tx_nodes_split)
utxo_task = request_utxo(previous_address, unspent_outputs_nodes_split)
nodes_view_task = request_nodes_view(node_public_keys)

prev_tx = Task.await(prev_tx_task, Message.get_max_timeout())
utxos = Task.await(utxo_task)
nodes_view = Task.await(nodes_view_task)

# involved_nodes =
# [prev_tx_node_involved, utxo_node_involved]
# |> Enum.filter(& &1)
# |> P2P.distinct_nodes()

%{
chain_nodes_view: chain_storage_nodes_view,
beacon_nodes_view: beacon_storage_nodes_view,
Expand All @@ -83,7 +76,13 @@ defmodule Archethic.Mining.TransactionContext do
io_storage_node_public_keys
)

# TODO: remove the invovled nodes as not used anymore
utxos =
TransactionChain.stream_unspent_outputs_remotely(
previous_address,
unspent_outputs_nodes_split
)
|> Enum.to_list()

{prev_tx, utxos, [], chain_storage_nodes_view, beacon_storage_nodes_view,
io_storage_nodes_view}
end
Expand Down Expand Up @@ -117,18 +116,6 @@ defmodule Archethic.Mining.TransactionContext do
end)
end

defp request_utxo(previous_address, nodes) do
Task.Supervisor.async(TaskSupervisor, fn ->
{:ok, utxos} =
TransactionChain.fetch_unspent_outputs_remotely(
previous_address,
nodes
)

utxos
end)
end

defp request_nodes_view(node_public_keys) do
Task.Supervisor.async(TaskSupervisor, fn ->
Task.Supervisor.async_stream_nolink(
Expand Down
129 changes: 104 additions & 25 deletions lib/archethic/p2p/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,6 @@ defmodule Archethic.P2P.Message do
@spec get_timeout(__MODULE__.t()) :: non_neg_integer()
def get_timeout(%GetTransaction{}), do: get_max_timeout()
def get_timeout(%GetLastTransaction{}), do: get_max_timeout()
def get_timeout(%NewTransaction{}), do: get_max_timeout()
samuelmanzanera marked this conversation as resolved.
Show resolved Hide resolved
def get_timeout(%StartMining{}), do: get_max_timeout()
def get_timeout(%ReplicateTransaction{}), do: get_max_timeout()
def get_timeout(%ReplicateTransactionChain{}), do: get_max_timeout()

def get_timeout(%GetTransactionChain{}) do
# As we use 10 transaction in the pagination we can estimate the max time
Expand All @@ -191,9 +187,6 @@ defmodule Archethic.P2P.Message do
# length(addresses) * trunc(beacon_summary_high_estimation_bytes / @floor_upload_speed * 1000)
# end

def get_timeout(%GetUnspentOutputs{}), do: get_max_timeout()
def get_timeout(%GetTransactionInputs{}), do: get_max_timeout()

def get_timeout(_), do: 3_000

@doc """
Expand Down Expand Up @@ -249,8 +242,8 @@ defmodule Archethic.P2P.Message do
<<4::8, tx_address::binary, byte_size(paging_state)::8, paging_state::binary>>
end

def encode(%GetUnspentOutputs{address: tx_address}) do
<<5::8, tx_address::binary>>
def encode(%GetUnspentOutputs{address: tx_address, offset: offset}) do
<<5::8, tx_address::binary, VarInt.from_value(offset)::binary>>
end

def encode(%NewTransaction{transaction: tx}) do
Expand Down Expand Up @@ -351,8 +344,8 @@ defmodule Archethic.P2P.Message do
<<16::8, address::binary>>
end

def encode(%GetTransactionInputs{address: address}) do
<<17::8, address::binary>>
def encode(%GetTransactionInputs{address: address, offset: offset}) do
<<17::8, address::binary, VarInt.from_value(offset)::binary>>
end

def encode(%GetTransactionChainLength{address: address}) do
Expand Down Expand Up @@ -494,7 +487,7 @@ defmodule Archethic.P2P.Message do
<<243::8, bit_size(view)::8, view::bitstring>>
end

def encode(%TransactionInputList{inputs: inputs}) do
def encode(%TransactionInputList{inputs: inputs, more?: more?, offset: offset}) do
inputs_bin =
inputs
|> Stream.map(&TransactionInput.serialize/1)
Expand All @@ -503,7 +496,10 @@ defmodule Archethic.P2P.Message do

encoded_inputs_length = length(inputs) |> VarInt.from_value()

<<244::8, encoded_inputs_length::binary, inputs_bin::bitstring>>
more_bit = if more?, do: 1, else: 0

<<244::8, encoded_inputs_length::binary, inputs_bin::bitstring, more_bit::1,
VarInt.from_value(offset)::binary>>
end

def encode(%TransactionChainLength{length: length}) do
Expand Down Expand Up @@ -560,7 +556,7 @@ defmodule Archethic.P2P.Message do
<<249::8, encoded_nodes_length::binary, nodes_bin::bitstring>>
end

def encode(%UnspentOutputList{unspent_outputs: unspent_outputs}) do
def encode(%UnspentOutputList{unspent_outputs: unspent_outputs, more?: more?, offset: offset}) do
unspent_outputs_bin =
unspent_outputs
|> Stream.map(&UnspentOutput.serialize/1)
Expand All @@ -569,7 +565,10 @@ defmodule Archethic.P2P.Message do

encoded_unspent_outputs_length = Enum.count(unspent_outputs) |> VarInt.from_value()

<<250::8, encoded_unspent_outputs_length::binary, unspent_outputs_bin::binary>>
more_bit = if more?, do: 1, else: 0

<<250::8, encoded_unspent_outputs_length::binary, unspent_outputs_bin::binary, more_bit::1,
VarInt.from_value(offset)::binary>>
end

def encode(%TransactionList{transactions: transactions, more?: false}) do
Expand Down Expand Up @@ -667,7 +666,9 @@ defmodule Archethic.P2P.Message do

def decode(<<5::8, rest::bitstring>>) do
{address, rest} = Utils.deserialize_address(rest)
{%GetUnspentOutputs{address: address}, rest}

{offset, rest} = VarInt.get_value(rest)
{%GetUnspentOutputs{address: address, offset: offset}, rest}
end

def decode(<<6::8, rest::bitstring>>) do
Expand Down Expand Up @@ -823,7 +824,8 @@ defmodule Archethic.P2P.Message do

def decode(<<17::8, rest::bitstring>>) do
{address, rest} = Utils.deserialize_address(rest)
{%GetTransactionInputs{address: address}, rest}
{offset, rest} = VarInt.get_value(rest)
{%GetTransactionInputs{address: address, offset: offset}, rest}
end

def decode(<<18::8, rest::bitstring>>) do
Expand Down Expand Up @@ -1034,10 +1036,18 @@ defmodule Archethic.P2P.Message do

def decode(<<244::8, rest::bitstring>>) do
{nb_inputs, rest} = rest |> VarInt.get_value()
{inputs, rest} = deserialize_transaction_inputs(rest, nb_inputs, [])

{inputs, <<more_bit::1, rest::bitstring>>} =
deserialize_transaction_inputs(rest, nb_inputs, [])

more? = more_bit == 1

{offset, rest} = VarInt.get_value(rest)

{%TransactionInputList{
inputs: inputs
inputs: inputs,
more?: more?,
offset: offset
}, rest}
end

Expand Down Expand Up @@ -1086,8 +1096,15 @@ defmodule Archethic.P2P.Message do

def decode(<<250::8, rest::bitstring>>) do
{nb_unspent_outputs, rest} = rest |> VarInt.get_value()
{unspent_outputs, rest} = deserialize_unspent_output_list(rest, nb_unspent_outputs, [])
{%UnspentOutputList{unspent_outputs: unspent_outputs}, rest}

{unspent_outputs, <<more_bit::1, rest::bitstring>>} =
deserialize_unspent_output_list(rest, nb_unspent_outputs, [])

more? = more_bit == 1

{offset, rest} = VarInt.get_value(rest)

{%UnspentOutputList{unspent_outputs: unspent_outputs, more?: more?, offset: offset}, rest}
end

def decode(<<251::8, rest::bitstring>>) do
Expand Down Expand Up @@ -1276,9 +1293,40 @@ defmodule Archethic.P2P.Message do
%TransactionList{transactions: chain, paging_state: paging_state, more?: more?}
end

def process(%GetUnspentOutputs{address: tx_address}) do
def process(%GetUnspentOutputs{address: tx_address, offset: offset}) do
utxos = Account.get_unspent_outputs(tx_address)
utxos_length = length(utxos)

%{utxos: utxos, offset: offset, more?: more?} =
utxos
|> Enum.with_index()
|> Enum.drop(offset)
|> Enum.reduce_while(%{utxos: [], offset: 0, more?: false}, fn {utxo, index}, acc ->
acc_size =
acc.utxos
|> Enum.map(&UnspentOutput.serialize/1)
|> :erlang.list_to_binary()
|> byte_size()

utxo_size = UnspentOutput.serialize(utxo) |> byte_size

if acc_size + utxo_size < 3_000_000 do
new_acc =
acc
|> Map.update!(:utxos, &[utxo | &1])
|> Map.put(:offset, index + 1)
|> Map.put(:more?, index + 1 < utxos_length)

{:cont, new_acc}
else
{:halt, acc}
end
end)

%UnspentOutputList{
unspent_outputs: Account.get_unspent_outputs(tx_address)
unspent_outputs: utxos,
offset: offset,
more?: more?
}
end

Expand Down Expand Up @@ -1447,16 +1495,47 @@ defmodule Archethic.P2P.Message do
}
end

def process(%GetTransactionInputs{address: address}) do
def process(%GetTransactionInputs{address: address, offset: offset}) do
contract_inputs =
address
|> Contracts.list_contract_transactions()
|> Enum.map(fn {address, timestamp} ->
%TransactionInput{from: address, type: :call, timestamp: timestamp}
end)

inputs = Account.get_inputs(address) ++ contract_inputs
inputs_length = length(inputs)

%{inputs: inputs, offset: offset, more?: more?} =
inputs
|> Enum.with_index()
|> Enum.drop(offset)
|> Enum.reduce_while(%{inputs: [], offset: 0, more?: false}, fn {input, index}, acc ->
acc_size =
acc.inputs
|> Enum.map(&TransactionInput.serialize/1)
|> :erlang.list_to_bitstring()
|> byte_size()

input_size = TransactionInput.serialize(input) |> byte_size

if acc_size + input_size < 3_000_000 do
new_acc =
acc
|> Map.update!(:inputs, &[input | &1])
|> Map.put(:offset, index + 1)
|> Map.put(:more?, index + 1 < inputs_length)

{:cont, new_acc}
else
{:halt, acc}
end
end)

%TransactionInputList{
inputs: Account.get_inputs(address) ++ contract_inputs
inputs: inputs,
more?: more?,
offset: offset
}
end

Expand Down
5 changes: 3 additions & 2 deletions lib/archethic/p2p/message/get_transaction_inputs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ defmodule Archethic.P2P.Message.GetTransactionInputs do
Represents a message with to request the inputs (spent or unspents) from a transaction
"""
@enforce_keys [:address]
defstruct [:address]
defstruct [:address, offset: 0]

alias Archethic.Crypto

@type t :: %__MODULE__{
address: Crypto.versioned_hash()
address: Crypto.versioned_hash(),
offset: non_neg_integer()
}
end
5 changes: 3 additions & 2 deletions lib/archethic/p2p/message/get_unspent_outputs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ defmodule Archethic.P2P.Message.GetUnspentOutputs do
Represents a message to request the list of unspent outputs from a transaction
"""
@enforce_keys [:address]
defstruct [:address]
defstruct [:address, offset: 0]

alias Archethic.Crypto

@type t :: %__MODULE__{
address: Crypto.versioned_hash()
address: Crypto.versioned_hash(),
offset: non_neg_integer()
}
end
6 changes: 4 additions & 2 deletions lib/archethic/p2p/message/transaction_input_list.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ defmodule Archethic.P2P.Message.TransactionInputList do
@moduledoc """
Represents a message with a list of transaction inputs
"""
defstruct inputs: []
defstruct inputs: [], more?: false, offset: 0

alias Archethic.TransactionChain.TransactionInput

@type t() :: %__MODULE__{
inputs: list(TransactionInput.t())
inputs: list(TransactionInput.t()),
more?: boolean(),
offset: non_neg_integer()
}
end
6 changes: 4 additions & 2 deletions lib/archethic/p2p/message/unspent_output_list.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ defmodule Archethic.P2P.Message.UnspentOutputList do
@moduledoc """
Represents a message with a list of unspent outputs
"""
defstruct unspent_outputs: []
defstruct unspent_outputs: [], more?: false, offset: 0

alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.UnspentOutput

@type t :: %__MODULE__{
unspent_outputs: list(UnspentOutput.t())
unspent_outputs: list(UnspentOutput.t()),
more?: boolean(),
offset: non_neg_integer()
}
end
Loading