Skip to content

Commit

Permalink
Support stream of tx inputs
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelmanzanera committed Sep 14, 2022
1 parent b964f8e commit 5c7cf69
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 54 deletions.
25 changes: 22 additions & 3 deletions lib/archethic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,35 @@ defmodule Archethic do
@doc """
Request to fetch the inputs for a transaction address from the closest nodes
"""
@spec get_transaction_inputs(binary()) ::
{:ok, list(TransactionInput.t())} | {:error, :network_issue}
@spec get_transaction_inputs(binary()) :: list(TransactionInput.t())
def get_transaction_inputs(address) when is_binary(address) do
nodes =
address
|> Election.chain_storage_nodes(P2P.authorized_and_available_nodes())
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)

TransactionChain.fetch_inputs_remotely(address, nodes, DateTime.utc_now())
address
|> TransactionChain.stream_inputs_remotely(nodes, DateTime.utc_now())
|> Enum.to_list()
end

@doc """
Request to fetch the inputs for a transaction address from the closest nodes at a given page
"""
@spec get_transaction_inputs(binary(), non_neg_integer()) :: list(TransactionInput.t())
def get_transaction_inputs(address, page)
when is_binary(address) and is_integer(page) and page >= 0 do
nodes =
address
|> Election.chain_storage_nodes(P2P.authorized_and_available_nodes())
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)

{inputs, _more?, _offset} =
TransactionChain.fetch_inputs_remotely(address, nodes, DateTime.utc_now(), page)

inputs
end

@doc """
Expand Down
63 changes: 52 additions & 11 deletions lib/archethic/p2p/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,6 @@ defmodule Archethic.P2P.Message do
# length(addresses) * trunc(beacon_summary_high_estimation_bytes / @floor_upload_speed * 1000)
# end

def get_timeout(%GetTransactionInputs{}), do: get_max_timeout()

def get_timeout(_), do: 3_000

@doc """
Expand Down Expand Up @@ -346,8 +344,8 @@ defmodule Archethic.P2P.Message do
<<16::8, address::binary>>
end

def encode(%GetTransactionInputs{address: address}) do
<<17::8, address::binary>>
def encode(%GetTransactionInputs{address: address, offset: offset}) do
<<17::8, address::binary, VarInt.from_value(offset)::binary>>
end

def encode(%GetTransactionChainLength{address: address}) do
Expand Down Expand Up @@ -489,7 +487,7 @@ defmodule Archethic.P2P.Message do
<<243::8, bit_size(view)::8, view::bitstring>>
end

def encode(%TransactionInputList{inputs: inputs}) do
def encode(%TransactionInputList{inputs: inputs, more?: more?, offset: offset}) do
inputs_bin =
inputs
|> Stream.map(&TransactionInput.serialize/1)
Expand All @@ -498,7 +496,10 @@ defmodule Archethic.P2P.Message do

encoded_inputs_length = length(inputs) |> VarInt.from_value()

<<244::8, encoded_inputs_length::binary, inputs_bin::bitstring>>
more_bit = if more?, do: 1, else: 0

<<244::8, encoded_inputs_length::binary, inputs_bin::bitstring, more_bit::1,
VarInt.from_value(offset)::binary>>
end

def encode(%TransactionChainLength{length: length}) do
Expand Down Expand Up @@ -823,7 +824,8 @@ defmodule Archethic.P2P.Message do

def decode(<<17::8, rest::bitstring>>) do
{address, rest} = Utils.deserialize_address(rest)
{%GetTransactionInputs{address: address}, rest}
{offset, rest} = VarInt.get_value(rest)
{%GetTransactionInputs{address: address, offset: offset}, rest}
end

def decode(<<18::8, rest::bitstring>>) do
Expand Down Expand Up @@ -1034,10 +1036,18 @@ defmodule Archethic.P2P.Message do

def decode(<<244::8, rest::bitstring>>) do
{nb_inputs, rest} = rest |> VarInt.get_value()
{inputs, rest} = deserialize_transaction_inputs(rest, nb_inputs, [])

{inputs, <<more_bit::1, rest::bitstring>>} =
deserialize_transaction_inputs(rest, nb_inputs, [])

more? = more_bit == 1

{offset, rest} = VarInt.get_value(rest)

{%TransactionInputList{
inputs: inputs
inputs: inputs,
more?: more?,
offset: offset
}, rest}
end

Expand Down Expand Up @@ -1485,16 +1495,47 @@ defmodule Archethic.P2P.Message do
}
end

def process(%GetTransactionInputs{address: address}) do
def process(%GetTransactionInputs{address: address, offset: offset}) do
contract_inputs =
address
|> Contracts.list_contract_transactions()
|> Enum.map(fn {address, timestamp} ->
%TransactionInput{from: address, type: :call, timestamp: timestamp}
end)

inputs = Account.get_inputs(address) ++ contract_inputs
inputs_length = length(inputs)

%{inputs: inputs, offset: offset, more?: more?} =
inputs
|> Enum.with_index()
|> Enum.drop(offset)
|> Enum.reduce_while(%{inputs: [], offset: 0, more?: false}, fn {input, index}, acc ->
acc_size =
acc.inputs
|> Enum.map(&TransactionInput.serialize/1)
|> :erlang.list_to_bitstring()
|> byte_size()

input_size = TransactionInput.serialize(input) |> byte_size

if acc_size + input_size < 3_000_000 do
new_acc =
acc
|> Map.update!(:inputs, &[input | &1])
|> Map.put(:offset, index + 1)
|> Map.put(:more?, index + 1 < inputs_length)

{:cont, new_acc}
else
{:halt, acc}
end
end)

%TransactionInputList{
inputs: Account.get_inputs(address) ++ contract_inputs
inputs: inputs,
more?: more?,
offset: offset
}
end

Expand Down
5 changes: 3 additions & 2 deletions lib/archethic/p2p/message/get_transaction_inputs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ defmodule Archethic.P2P.Message.GetTransactionInputs do
Represents a message with to request the inputs (spent or unspents) from a transaction
"""
@enforce_keys [:address]
defstruct [:address]
defstruct [:address, offset: 0]

alias Archethic.Crypto

@type t :: %__MODULE__{
address: Crypto.versioned_hash()
address: Crypto.versioned_hash(),
offset: non_neg_integer()
}
end
6 changes: 4 additions & 2 deletions lib/archethic/p2p/message/transaction_input_list.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ defmodule Archethic.P2P.Message.TransactionInputList do
@moduledoc """
Represents a message with a list of transaction inputs
"""
defstruct inputs: []
defstruct inputs: [], more?: false, offset: 0

alias Archethic.TransactionChain.TransactionInput

@type t() :: %__MODULE__{
inputs: list(TransactionInput.t())
inputs: list(TransactionInput.t()),
more?: boolean(),
offset: non_neg_integer()
}
end
5 changes: 3 additions & 2 deletions lib/archethic/replication/transaction_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ defmodule Archethic.Replication.TransactionContext do
def fetch_transaction_inputs(address, timestamp = %DateTime{}) when is_binary(address) do
nodes = replication_nodes(address)

{:ok, inputs} = TransactionChain.fetch_inputs_remotely(address, nodes, timestamp)
inputs
address
|> TransactionChain.stream_inputs_remotely(nodes, timestamp)
|> Enum.to_list()
end

defp replication_nodes(address) do
Expand Down
41 changes: 33 additions & 8 deletions lib/archethic/transaction_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -722,18 +722,43 @@ defmodule Archethic.TransactionChain do
end
end

@doc """
Stream the trnasaction inputs for a transaction address at a given time
"""
@spec stream_inputs_remotely(binary(), list(Node.t()), DateTime.t()) ::
Enumerable.t() | list(TransactionInput.t())
def stream_inputs_remotely(_, [], _, _), do: []

def stream_inputs_remotely(address, nodes, timestamp) do
Stream.resource(
fn -> fetch_inputs_remotely(address, nodes, timestamp) end,
fn
{inputs, true, offset} ->
{inputs, fetch_inputs_remotely(address, nodes, timestamp, offset)}

{inputs, false, _} ->
{inputs, :eof}

:eof ->
{:halt, nil}
end,
fn _ -> :ok end
)
end

@doc """
Fetch the transaction inputs for a transaction address at a given time
If the inputs exist, then they are returned in the shape of `{:ok, inputs}`.
If no nodes are able to answer the request, `{:error, :network_issue}` is returned.
"""
@spec fetch_inputs_remotely(address :: Crypto.versioned_hash(), list(Node.t()), DateTime.t()) ::
{:ok, list(TransactionInput.t())} | {:error, :network_issue}
def fetch_inputs_remotely(_, [], _), do: {:ok, []}
{inputs :: list(TransactionInput.t()), more? :: boolean(), offset :: non_neg_integer()}
def fetch_inputs_remotely(address, nodes, timestamp, offset \\ 0)
def fetch_inputs_remotely(_, [], _, _), do: {[], false, 0}

def fetch_inputs_remotely(address, nodes, timestamp = %DateTime{})
when is_binary(address) and is_list(nodes) do
def fetch_inputs_remotely(address, nodes, timestamp = %DateTime{}, offset)
when is_binary(address) and is_list(nodes) and is_integer(offset) and offset >= 0 do
conflict_resolver = fn results ->
results
|> Enum.sort_by(&length(&1.inputs), :desc)
Expand All @@ -742,15 +767,15 @@ defmodule Archethic.TransactionChain do

case P2P.quorum_read(
nodes,
%GetTransactionInputs{address: address},
%GetTransactionInputs{address: address, offset: offset},
conflict_resolver
) do
{:ok, %TransactionInputList{inputs: inputs}} ->
{:ok, %TransactionInputList{inputs: inputs, more?: more?, offset: offset}} ->
filtered_inputs = Enum.filter(inputs, &(DateTime.diff(&1.timestamp, timestamp) <= 0))
{:ok, filtered_inputs}
{filtered_inputs, more?, offset}

{:error, :network_issue} ->
{:error, :network_issue}
{[], false, 0}
end
end

Expand Down
11 changes: 9 additions & 2 deletions lib/archethic_web/graphql_schema.ex
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,16 @@ defmodule ArchethicWeb.GraphQLSchema do
"""
field :transaction_inputs, list_of(:transaction_input) do
arg(:address, non_null(:address))
arg(:paging_offset, :paging_offset)

resolve(fn %{address: address}, _ ->
Resolver.get_inputs(address)
resolve(fn args = %{address: address}, _ ->
case Map.get(args, :paging_offset) do
nil ->
Resolver.get_inputs(address)

page ->
Resolver.get_inputs(address, page)
end
end)
end

Expand Down
19 changes: 17 additions & 2 deletions lib/archethic_web/graphql_schema/page_type.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,28 @@ defmodule ArchethicWeb.GraphQLSchema.PageType do
parse(&do_parse/1)
end

@spec do_parse(Absinthe.Blueprint.Input.Integer.t()) ::
{:ok, integer()} | :error
@desc """
The [PagingOffset] scalar type represents the paging offset, which is the number of elements to skip
for the next reading
"""
scalar :paging_offset do
parse(&do_parse_offset/1)
end

defp do_parse(%Absinthe.Blueprint.Input.Integer{value: page_value}) when page_value >= 1 do
{:ok, page_value}
end

defp do_parse(_page) do
:error
end

defp do_parse_offset(%Absinthe.Blueprint.Input.Integer{value: page_value})
when page_value >= 0 do
{:ok, page_value}
end

defp do_parse_offset(_page) do
:error
end
end
20 changes: 14 additions & 6 deletions lib/archethic_web/graphql_schema/resolver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,21 @@ defmodule ArchethicWeb.GraphQLSchema.Resolver do
end

def get_inputs(address) do
case Archethic.get_transaction_inputs(address) do
{:ok, inputs} ->
{:ok, Enum.map(inputs, &TransactionInput.to_map/1)}
inputs =
address
|> Archethic.get_transaction_inputs()
|> Enum.map(&TransactionInput.to_map/1)

{:error, _} = e ->
e
end
{:ok, inputs}
end

def get_inputs(address, paging_offset) do
inputs =
address
|> Archethic.get_transaction_inputs(paging_offset)
|> Enum.map(&TransactionInput.to_map/1)

{:ok, inputs}
end

def shared_secrets do
Expand Down
25 changes: 9 additions & 16 deletions lib/archethic_web/live/transaction_details_live.ex
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ defmodule ArchethicWeb.TransactionDetailsLive do
previous_address = Transaction.previous_address(tx)

with {:ok, balance} <- Archethic.get_balance(address),
{:ok, inputs} <- Archethic.get_transaction_inputs(address) do
inputs <- Archethic.get_transaction_inputs(address) do
ledger_inputs = Enum.reject(inputs, &(&1.type == :call))
contract_inputs = Enum.filter(inputs, &(&1.type == :call))
uco_price_at_time = tx.validation_stamp.timestamp |> OracleChain.get_uco_price()
Expand All @@ -107,22 +107,15 @@ defmodule ArchethicWeb.TransactionDetailsLive do
end

defp handle_not_existing_transaction(socket, address) do
case Archethic.get_transaction_inputs(address) do
{:ok, inputs} ->
ledger_inputs = Enum.reject(inputs, &(&1.type == :call))
contract_inputs = Enum.filter(inputs, &(&1.type == :call))
inputs = Archethic.get_transaction_inputs(address)
ledger_inputs = Enum.reject(inputs, &(&1.type == :call))
contract_inputs = Enum.filter(inputs, &(&1.type == :call))

socket
|> assign(:address, address)
|> assign(:inputs, ledger_inputs)
|> assign(:calls, contract_inputs)
|> assign(:error, :not_exists)

{:error, :network_issue} ->
socket
|> assign(:address, address)
|> assign(:error, :network_issue)
end
socket
|> assign(:address, address)
|> assign(:inputs, ledger_inputs)
|> assign(:calls, contract_inputs)
|> assign(:error, :not_exists)
end

defp handle_invalid_address(socket, address) do
Expand Down

0 comments on commit 5c7cf69

Please sign in to comment.