Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix get transaction chain lookup #528

Merged
1 commit merged into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 44 additions & 19 deletions lib/archethic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -189,28 +189,53 @@ defmodule Archethic do
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)

try do
{local_chain, paging_address} =
case TransactionChain.get_last_local_address(address) do
nil -> {[], nil}
last_address -> {TransactionChain.get_locally(last_address), last_address}
end

remote_chain =
if address != paging_address do
# We directly check if the transaction exists and retrieve the genesis
# Otherwise we are requesting the genesis address remotly
genesis_address =
with ^address <- TransactionChain.get_genesis_address(address),
{:ok, genesis_address} <- TransactionChain.fetch_genesis_address_remotely(address) do
genesis_address
else
_ ->
address
|> TransactionChain.stream_remotely(nodes, paging_address)
|> Enum.to_list()
|> List.flatten()
end

%{transactions: local_chain, last_address: last_local_address} =
genesis_address
|> TransactionChain.scan_chain(address)
|> Enum.reduce_while(%{transactions: [], last_address: nil}, fn transaction, acc ->
This conversation was marked as resolved.
Show resolved Hide resolved
# We stop at the desire the transaction
if acc.last_address == address do
{:halt, acc}
else
[]
end
new_acc =
acc
|> Map.update!(:transactions, &(&1 ++ [transaction]))
# We log the last local address
|> Map.put(:last_address, transaction.address)

{:ok, local_chain ++ remote_chain}
catch
_ ->
{:error, :network_issue}
end
{:cont, new_acc}
end
end)

remote_chain =
if address != last_local_address do
address
|> TransactionChain.stream_remotely(nodes, last_local_address)
|> Stream.flat_map(& &1)
|> Stream.transform(nil, fn
_tx, ^address ->
{:halt, address}

tx, _ ->
{[tx], tx.address}
end)
|> Enum.to_list()
else
[]
end

{:ok, local_chain ++ remote_chain}
end

@doc """
Expand Down
7 changes: 6 additions & 1 deletion lib/archethic/db.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@ defmodule Archethic.DB do
@callback get_last_chain_address(binary(), DateTime.t()) :: {binary(), DateTime.t()}
@callback get_first_chain_address(binary()) :: binary()
@callback get_first_public_key(Crypto.key()) :: binary()

@callback scan_chain(
genesis_address :: binary(),
limit_address :: binary(),
fields :: list(),
paging_state :: nil | binary()
) :: Enumerable.t()
@callback register_stats(DateTime.t(), float(), non_neg_integer(), non_neg_integer()) :: :ok
@callback get_latest_tps() :: float()
@callback get_latest_burned_fees() :: non_neg_integer()
Expand Down
9 changes: 9 additions & 0 deletions lib/archethic/db/embedded_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -312,4 +312,13 @@ defmodule Archethic.DB.EmbeddedImpl do
}
}
defdelegate get_last_p2p_summaries, to: P2PView, as: :get_views

@doc """
Read chain from the beginning until a given limit address
"""
@spec scan_chain(binary(), binary(), list(), binary() | nil) ::
{list(Transaction.t()), boolean(), binary() | nil}
def scan_chain(genesis_address, limit_address, fields \\ [], paging_state \\ nil) do
ChainReader.scan_chain(genesis_address, limit_address, fields, paging_state, db_path())
end
end
56 changes: 46 additions & 10 deletions lib/archethic/db/embedded_impl/chain_reader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,11 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do
{[], false, ""}

{:ok, %{genesis_address: genesis_address}} ->
filepath = ChainWriter.chain_path(db_path, genesis_address)
fd = File.open!(filepath, [:binary, :read])

{transactions, more?, paging_state} =
process_get_chain({address, fields, opts, db_path}, genesis_address)
process_get_chain(fd, address, fields, opts, db_path)

:telemetry.execute([:archethic, :db], %{duration: System.monotonic_time() - start}, %{
query: "get_transaction_chain"
Expand All @@ -75,32 +78,30 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do
end
end

defp process_get_chain({address, fields, opts, db_path}, genesis_address) do
filepath = ChainWriter.chain_path(db_path, genesis_address)
fd = File.open!(filepath, [:binary, :read])
defp process_get_chain(fd, address, fields, opts, db_path) do
# Set the file cursor position to the paging state
case Keyword.get(opts, :paging_state) do
nil ->
:file.position(fd, 0)
process_get_chain({address, fields}, {fd})
do_process_get_chain(fd, address, fields)

paging_address ->
case ChainIndex.get_tx_entry(paging_address, db_path) do
{:ok, %{offset: offset, size: size}} ->
:file.position(fd, offset + size)
process_get_chain({address, fields}, {fd})
do_process_get_chain(fd, address, fields)

{:error, :not_exists} ->
{[], false, ""}
end
end
end

defp process_get_chain({address, fields}, {fd}) do
defp do_process_get_chain(fd, address, fields) do
column_names = fields_to_column_names(fields)

# Read the transactions until the nb of transactions to fullfil the page (ie. 10 transactions)
{transactions, more?, paging_state} = scan_chain(fd, column_names, address)
{transactions, more?, paging_state} = do_scan_chain(fd, column_names, address)
:file.close(fd)

{transactions, more?, paging_state}
Expand Down Expand Up @@ -138,7 +139,42 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do
end
end

defp scan_chain(fd, fields, limit_address, acc \\ []) do
@doc """
Read chain from the beginning until a given limit address
"""
@spec scan_chain(
genesis_address :: binary(),
limit_address :: binary(),
list(),
paging_address :: nil | binary(),
db_path :: binary()
) ::
{list(Transaction.t()), boolean(), binary() | nil}
def scan_chain(genesis_address, limit_address, fields, paging_address, db_path) do
filepath = ChainWriter.chain_path(db_path, genesis_address)
column_names = fields_to_column_names(fields)

case File.open(filepath, [:binary, :read]) do
{:ok, fd} ->
if paging_address do
case ChainIndex.get_tx_entry(paging_address, db_path) do
{:ok, %{offset: offset, size: size}} ->
:file.position(fd, offset + size)
do_scan_chain(fd, column_names, limit_address)

{:error, :not_exists} ->
{[], false, ""}
end
else
do_scan_chain(fd, column_names, limit_address)
end

{:error, _} ->
{[], false, nil}
end
end

defp do_scan_chain(fd, fields, limit_address, acc \\ []) do
case :file.read(fd, 8) do
{:ok, <<size::32, version::32>>} ->
if length(acc) == @page_size do
Expand All @@ -158,7 +194,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do
if tx.address == limit_address do
{Enum.reverse([tx | acc]), false, nil}
else
scan_chain(fd, fields, limit_address, [tx | acc])
do_scan_chain(fd, fields, limit_address, [tx | acc])
end
end

Expand Down
34 changes: 22 additions & 12 deletions lib/archethic/transaction_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ defmodule Archethic.TransactionChain do
Stream the transactions from a chain
"""
@spec stream(binary(), list()) :: Enumerable.t() | list(Transaction.t())
def stream(address, fields) do
def stream(address, fields \\ []) do
Stream.resource(
fn -> DB.get_transaction_chain(address, fields, []) end,
fn
Expand Down Expand Up @@ -717,17 +717,8 @@ defmodule Archethic.TransactionChain do
|> List.first()
end

# Get transaction chain size to calculate timeout
chain_size =
case Archethic.get_transaction_chain_length(address) do
{:ok, chain_size} ->
chain_size

_ ->
1
end

timeout = Message.get_max_timeout() + Message.get_max_timeout() * chain_size
# We got transactions by batch of 10 transactions
timeout = Message.get_max_timeout() + Message.get_max_timeout() * 10

case P2P.quorum_read(
nodes,
Expand Down Expand Up @@ -889,4 +880,23 @@ defmodule Archethic.TransactionChain do
def fetch_chain_db({chain, true, paging_address}, acc) do
fetch_chain_db(get(paging_address, [], paging_state: paging_address), acc ++ chain)
end

@spec scan_chain(genesis_address :: binary(), limit_address :: binary(), fields :: list()) ::
Enumerable.t()
def scan_chain(genesis_address, limit_address, fields \\ []) do
Stream.resource(
fn -> DB.scan_chain(genesis_address, limit_address, fields, nil) end,
fn
{transactions, true, paging_state} ->
{transactions, DB.scan_chain(genesis_address, limit_address, fields, paging_state)}

{transactions, false, _} ->
{transactions, :eof}

:eof ->
{:halt, nil}
end,
fn _ -> :ok end
)
end
end
50 changes: 50 additions & 0 deletions test/archethic/db/embedded_impl_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,56 @@ defmodule Archethic.DB.EmbeddedTest do
end
end

describe "scan_chain/4" do
test "should return the list of all the transactions until the one given" do
transactions =
Enum.map(1..20, fn i ->
TransactionFactory.create_valid_transaction([],
index: i,
timestamp: DateTime.utc_now() |> DateTime.add(i * 60)
)
end)

genesis_address = transactions |> List.first() |> Transaction.previous_address()
:ok = EmbeddedImpl.write_transaction_chain(transactions)

assert {txs, false, nil} =
EmbeddedImpl.scan_chain(genesis_address, Enum.at(transactions, 2).address)

assert Enum.take(transactions, 3) == txs
end

test "should return a page and its paging state" do
transactions =
Enum.map(1..20, fn i ->
TransactionFactory.create_valid_transaction([],
index: i,
timestamp: DateTime.utc_now() |> DateTime.add(i * 60)
)
end)

EmbeddedImpl.write_transaction_chain(transactions)
genesis_address = transactions |> List.first() |> Transaction.previous_address()

{page, true, paging_state} =
EmbeddedImpl.scan_chain(genesis_address, Enum.at(transactions, 12).address)

assert length(page) == 10
assert page == Enum.take(transactions, 10)
assert paging_state == Enum.at(transactions, 9).address

{page2, false, nil} =
EmbeddedImpl.scan_chain(
genesis_address,
Enum.at(transactions, 12).address,
[],
paging_state
)

assert length(page2) == 3
end
end

describe "chain_size/1" do
test "should return 0 when there are not transactions" do
assert 0 == EmbeddedImpl.chain_size(:crypto.strong_rand_bytes(32))
Expand Down
Loading