Skip to content

Commit

Permalink
Get chain and limit on the requested address (#528)
Browse files Browse the repository at this point in the history
  • Loading branch information
Samuel committed Aug 19, 2022
1 parent 20f7548 commit 2cf7a6e
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 134 deletions.
63 changes: 44 additions & 19 deletions lib/archethic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -189,28 +189,53 @@ defmodule Archethic do
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)

try do
{local_chain, paging_address} =
case TransactionChain.get_last_local_address(address) do
nil -> {[], nil}
last_address -> {TransactionChain.get_locally(last_address), last_address}
end

remote_chain =
if address != paging_address do
# We directly check if the transaction exists and retrieve the genesis
# Otherwise we are requesting the genesis address remotly
genesis_address =
with ^address <- TransactionChain.get_genesis_address(address),
{:ok, genesis_address} <- TransactionChain.fetch_genesis_address_remotely(address) do
genesis_address
else
_ ->
address
|> TransactionChain.stream_remotely(nodes, paging_address)
|> Enum.to_list()
|> List.flatten()
end

%{transactions: local_chain, last_address: last_local_address} =
genesis_address
|> TransactionChain.scan_chain(address)
|> Enum.reduce_while(%{transactions: [], last_address: nil}, fn transaction, acc ->
# We stop at the desire the transaction
if acc.last_address == address do
{:halt, acc}
else
[]
end
new_acc =
acc
|> Map.update!(:transactions, &(&1 ++ [transaction]))
# We log the last local address
|> Map.put(:last_address, transaction.address)

{:ok, local_chain ++ remote_chain}
catch
_ ->
{:error, :network_issue}
end
{:cont, new_acc}
end
end)

remote_chain =
if address != last_local_address do
address
|> TransactionChain.stream_remotely(nodes, last_local_address)
|> Stream.flat_map(& &1)
|> Stream.transform(nil, fn
_tx, ^address ->
{:halt, address}

tx, _ ->
{[tx], tx.address}
end)
|> Enum.to_list()
else
[]
end

{:ok, local_chain ++ remote_chain}
end

@doc """
Expand Down
7 changes: 6 additions & 1 deletion lib/archethic/db.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@ defmodule Archethic.DB do
@callback get_last_chain_address(binary(), DateTime.t()) :: {binary(), DateTime.t()}
@callback get_first_chain_address(binary()) :: binary()
@callback get_first_public_key(Crypto.key()) :: binary()

@callback scan_chain(
genesis_address :: binary(),
limit_address :: binary(),
fields :: list(),
paging_state :: nil | binary()
) :: Enumerable.t()
@callback register_stats(DateTime.t(), float(), non_neg_integer(), non_neg_integer()) :: :ok
@callback get_latest_tps() :: float()
@callback get_latest_burned_fees() :: non_neg_integer()
Expand Down
9 changes: 9 additions & 0 deletions lib/archethic/db/embedded_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -312,4 +312,13 @@ defmodule Archethic.DB.EmbeddedImpl do
}
}
defdelegate get_last_p2p_summaries, to: P2PView, as: :get_views

@doc """
Read chain from the beginning until a given limit address
"""
@spec scan_chain(binary(), binary(), list(), binary() | nil) ::
{list(Transaction.t()), boolean(), binary() | nil}
def scan_chain(genesis_address, limit_address, fields \\ [], paging_state \\ nil) do
ChainReader.scan_chain(genesis_address, limit_address, fields, paging_state, db_path())
end
end
56 changes: 46 additions & 10 deletions lib/archethic/db/embedded_impl/chain_reader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,11 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do
{[], false, ""}

{:ok, %{genesis_address: genesis_address}} ->
filepath = ChainWriter.chain_path(db_path, genesis_address)
fd = File.open!(filepath, [:binary, :read])

{transactions, more?, paging_state} =
process_get_chain({address, fields, opts, db_path}, genesis_address)
process_get_chain(fd, address, fields, opts, db_path)

:telemetry.execute([:archethic, :db], %{duration: System.monotonic_time() - start}, %{
query: "get_transaction_chain"
Expand All @@ -75,32 +78,30 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do
end
end

defp process_get_chain({address, fields, opts, db_path}, genesis_address) do
filepath = ChainWriter.chain_path(db_path, genesis_address)
fd = File.open!(filepath, [:binary, :read])
defp process_get_chain(fd, address, fields, opts, db_path) do
# Set the file cursor position to the paging state
case Keyword.get(opts, :paging_state) do
nil ->
:file.position(fd, 0)
process_get_chain({address, fields}, {fd})
do_process_get_chain(fd, address, fields)

paging_address ->
case ChainIndex.get_tx_entry(paging_address, db_path) do
{:ok, %{offset: offset, size: size}} ->
:file.position(fd, offset + size)
process_get_chain({address, fields}, {fd})
do_process_get_chain(fd, address, fields)

{:error, :not_exists} ->
{[], false, ""}
end
end
end

defp process_get_chain({address, fields}, {fd}) do
defp do_process_get_chain(fd, address, fields) do
column_names = fields_to_column_names(fields)

# Read the transactions until the nb of transactions to fullfil the page (ie. 10 transactions)
{transactions, more?, paging_state} = scan_chain(fd, column_names, address)
{transactions, more?, paging_state} = do_scan_chain(fd, column_names, address)
:file.close(fd)

{transactions, more?, paging_state}
Expand Down Expand Up @@ -138,7 +139,42 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do
end
end

defp scan_chain(fd, fields, limit_address, acc \\ []) do
@doc """
Read chain from the beginning until a given limit address
"""
@spec scan_chain(
genesis_address :: binary(),
limit_address :: binary(),
list(),
paging_address :: nil | binary(),
db_path :: binary()
) ::
{list(Transaction.t()), boolean(), binary() | nil}
def scan_chain(genesis_address, limit_address, fields, paging_address, db_path) do
filepath = ChainWriter.chain_path(db_path, genesis_address)
column_names = fields_to_column_names(fields)

case File.open(filepath, [:binary, :read]) do
{:ok, fd} ->
if paging_address do
case ChainIndex.get_tx_entry(paging_address, db_path) do
{:ok, %{offset: offset, size: size}} ->
:file.position(fd, offset + size)
do_scan_chain(fd, column_names, limit_address)

{:error, :not_exists} ->
{[], false, ""}
end
else
do_scan_chain(fd, column_names, limit_address)
end

{:error, _} ->
{[], false, nil}
end
end

defp do_scan_chain(fd, fields, limit_address, acc \\ []) do
case :file.read(fd, 8) do
{:ok, <<size::32, version::32>>} ->
if length(acc) == @page_size do
Expand All @@ -158,7 +194,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do
if tx.address == limit_address do
{Enum.reverse([tx | acc]), false, nil}
else
scan_chain(fd, fields, limit_address, [tx | acc])
do_scan_chain(fd, fields, limit_address, [tx | acc])
end
end

Expand Down
34 changes: 22 additions & 12 deletions lib/archethic/transaction_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ defmodule Archethic.TransactionChain do
Stream the transactions from a chain
"""
@spec stream(binary(), list()) :: Enumerable.t() | list(Transaction.t())
def stream(address, fields) do
def stream(address, fields \\ []) do
Stream.resource(
fn -> DB.get_transaction_chain(address, fields, []) end,
fn
Expand Down Expand Up @@ -717,17 +717,8 @@ defmodule Archethic.TransactionChain do
|> List.first()
end

# Get transaction chain size to calculate timeout
chain_size =
case Archethic.get_transaction_chain_length(address) do
{:ok, chain_size} ->
chain_size

_ ->
1
end

timeout = Message.get_max_timeout() + Message.get_max_timeout() * chain_size
# We got transactions by batch of 10 transactions
timeout = Message.get_max_timeout() + Message.get_max_timeout() * 10

case P2P.quorum_read(
nodes,
Expand Down Expand Up @@ -889,4 +880,23 @@ defmodule Archethic.TransactionChain do
def fetch_chain_db({chain, true, paging_address}, acc) do
fetch_chain_db(get(paging_address, [], paging_state: paging_address), acc ++ chain)
end

@spec scan_chain(genesis_address :: binary(), limit_address :: binary(), fields :: list()) ::
Enumerable.t()
def scan_chain(genesis_address, limit_address, fields \\ []) do
Stream.resource(
fn -> DB.scan_chain(genesis_address, limit_address, fields, nil) end,
fn
{transactions, true, paging_state} ->
{transactions, DB.scan_chain(genesis_address, limit_address, fields, paging_state)}

{transactions, false, _} ->
{transactions, :eof}

:eof ->
{:halt, nil}
end,
fn _ -> :ok end
)
end
end
50 changes: 50 additions & 0 deletions test/archethic/db/embedded_impl_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,56 @@ defmodule Archethic.DB.EmbeddedTest do
end
end

describe "scan_chain/4" do
test "should return the list of all the transactions until the one given" do
transactions =
Enum.map(1..20, fn i ->
TransactionFactory.create_valid_transaction([],
index: i,
timestamp: DateTime.utc_now() |> DateTime.add(i * 60)
)
end)

genesis_address = transactions |> List.first() |> Transaction.previous_address()
:ok = EmbeddedImpl.write_transaction_chain(transactions)

assert {txs, false, nil} =
EmbeddedImpl.scan_chain(genesis_address, Enum.at(transactions, 2).address)

assert Enum.take(transactions, 3) == txs
end

test "should return a page and its paging state" do
transactions =
Enum.map(1..20, fn i ->
TransactionFactory.create_valid_transaction([],
index: i,
timestamp: DateTime.utc_now() |> DateTime.add(i * 60)
)
end)

EmbeddedImpl.write_transaction_chain(transactions)
genesis_address = transactions |> List.first() |> Transaction.previous_address()

{page, true, paging_state} =
EmbeddedImpl.scan_chain(genesis_address, Enum.at(transactions, 12).address)

assert length(page) == 10
assert page == Enum.take(transactions, 10)
assert paging_state == Enum.at(transactions, 9).address

{page2, false, nil} =
EmbeddedImpl.scan_chain(
genesis_address,
Enum.at(transactions, 12).address,
[],
paging_state
)

assert length(page2) == 3
end
end

describe "chain_size/1" do
test "should return 0 when there are not transactions" do
assert 0 == EmbeddedImpl.chain_size(:crypto.strong_rand_bytes(32))
Expand Down
Loading

0 comments on commit 2cf7a6e

Please sign in to comment.