From 53720c707b92775c99e588219f153afa8477b573 Mon Sep 17 00:00:00 2001 From: Samuel Date: Thu, 18 Aug 2022 16:34:52 +0200 Subject: [PATCH] Get chain and limit on the requested address --- lib/archethic.ex | 63 +++++++---- lib/archethic/db.ex | 7 +- lib/archethic/db/embedded_impl.ex | 9 ++ .../db/embedded_impl/chain_reader.ex | 56 ++++++++-- lib/archethic/transaction_chain.ex | 34 ++++-- test/archethic/db/embedded_impl_test.exs | 50 +++++++++ test/archethic_test.exs | 105 +++--------------- test/support/template.ex | 1 + 8 files changed, 191 insertions(+), 134 deletions(-) diff --git a/lib/archethic.ex b/lib/archethic.ex index 02070d245..cf93d484f 100644 --- a/lib/archethic.ex +++ b/lib/archethic.ex @@ -189,28 +189,53 @@ defmodule Archethic do |> P2P.nearest_nodes() |> Enum.filter(&Node.locally_available?/1) - try do - {local_chain, paging_address} = - case TransactionChain.get_last_local_address(address) do - nil -> {[], nil} - last_address -> {TransactionChain.get_locally(last_address), last_address} - end - - remote_chain = - if address != paging_address do + # We directly check if the transaction exists and retrieve the genesis + # Otherwise we are requesting the genesis address remotly + genesis_address = + with ^address <- TransactionChain.get_genesis_address(address), + {:ok, genesis_address} <- TransactionChain.fetch_genesis_address_remotely(address) do + genesis_address + else + _ -> address - |> TransactionChain.stream_remotely(nodes, paging_address) - |> Enum.to_list() - |> List.flatten() + end + + %{transactions: local_chain, last_address: last_local_address} = + genesis_address + |> TransactionChain.scan_chain(address) + |> Enum.reduce_while(%{transactions: [], last_address: nil}, fn transaction, acc -> + # We stop at the desire the transaction + if acc.last_address == address do + {:halt, acc} else - [] - end + new_acc = + acc + |> Map.update!(:transactions, &(&1 ++ [transaction])) + # We log the last local address + |> Map.put(:last_address, transaction.address) - {:ok, local_chain ++ remote_chain} - catch - _ -> - {:error, :network_issue} - end + {:cont, new_acc} + end + end) + + remote_chain = + if address != last_local_address do + address + |> TransactionChain.stream_remotely(nodes, last_local_address) + |> Stream.flat_map(& &1) + |> Stream.transform(nil, fn + _tx, ^address -> + {:halt, address} + + tx, _ -> + {[tx], tx.address} + end) + |> Enum.to_list() + else + [] + end + + {:ok, local_chain ++ remote_chain} end @doc """ diff --git a/lib/archethic/db.ex b/lib/archethic/db.ex index 856e4f762..2db74124d 100644 --- a/lib/archethic/db.ex +++ b/lib/archethic/db.ex @@ -35,7 +35,12 @@ defmodule Archethic.DB do @callback get_last_chain_address(binary(), DateTime.t()) :: {binary(), DateTime.t()} @callback get_first_chain_address(binary()) :: binary() @callback get_first_public_key(Crypto.key()) :: binary() - + @callback scan_chain( + genesis_address :: binary(), + limit_address :: binary(), + fields :: list(), + paging_state :: nil | binary() + ) :: Enumerable.t() @callback register_stats(DateTime.t(), float(), non_neg_integer(), non_neg_integer()) :: :ok @callback get_latest_tps() :: float() @callback get_latest_burned_fees() :: non_neg_integer() diff --git a/lib/archethic/db/embedded_impl.ex b/lib/archethic/db/embedded_impl.ex index 3e5ab47e9..337be709e 100644 --- a/lib/archethic/db/embedded_impl.ex +++ b/lib/archethic/db/embedded_impl.ex @@ -312,4 +312,13 @@ defmodule Archethic.DB.EmbeddedImpl do } } defdelegate get_last_p2p_summaries, to: P2PView, as: :get_views + + @doc """ + Read chain from the beginning until a given limit address + """ + @spec scan_chain(binary(), binary(), list(), binary() | nil) :: + {list(Transaction.t()), boolean(), binary() | nil} + def scan_chain(genesis_address, limit_address, fields \\ [], paging_state \\ nil) do + ChainReader.scan_chain(genesis_address, limit_address, fields, paging_state, db_path()) + end end diff --git a/lib/archethic/db/embedded_impl/chain_reader.ex b/lib/archethic/db/embedded_impl/chain_reader.ex index 8362469dd..63a24032e 100644 --- a/lib/archethic/db/embedded_impl/chain_reader.ex +++ b/lib/archethic/db/embedded_impl/chain_reader.ex @@ -64,8 +64,11 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do {[], false, ""} {:ok, %{genesis_address: genesis_address}} -> + filepath = ChainWriter.chain_path(db_path, genesis_address) + fd = File.open!(filepath, [:binary, :read]) + {transactions, more?, paging_state} = - process_get_chain({address, fields, opts, db_path}, genesis_address) + process_get_chain(fd, address, fields, opts, db_path) :telemetry.execute([:archethic, :db], %{duration: System.monotonic_time() - start}, %{ query: "get_transaction_chain" @@ -75,20 +78,18 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do end end - defp process_get_chain({address, fields, opts, db_path}, genesis_address) do - filepath = ChainWriter.chain_path(db_path, genesis_address) - fd = File.open!(filepath, [:binary, :read]) + defp process_get_chain(fd, address, fields, opts, db_path) do # Set the file cursor position to the paging state case Keyword.get(opts, :paging_state) do nil -> :file.position(fd, 0) - process_get_chain({address, fields}, {fd}) + do_process_get_chain(fd, address, fields) paging_address -> case ChainIndex.get_tx_entry(paging_address, db_path) do {:ok, %{offset: offset, size: size}} -> :file.position(fd, offset + size) - process_get_chain({address, fields}, {fd}) + do_process_get_chain(fd, address, fields) {:error, :not_exists} -> {[], false, ""} @@ -96,11 +97,11 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do end end - defp process_get_chain({address, fields}, {fd}) do + defp do_process_get_chain(fd, address, fields) do column_names = fields_to_column_names(fields) # Read the transactions until the nb of transactions to fullfil the page (ie. 10 transactions) - {transactions, more?, paging_state} = scan_chain(fd, column_names, address) + {transactions, more?, paging_state} = do_scan_chain(fd, column_names, address) :file.close(fd) {transactions, more?, paging_state} @@ -138,7 +139,42 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do end end - defp scan_chain(fd, fields, limit_address, acc \\ []) do + @doc """ + Read chain from the beginning until a given limit address + """ + @spec scan_chain( + genesis_address :: binary(), + limit_address :: binary(), + list(), + paging_address :: nil | binary(), + db_path :: binary() + ) :: + {list(Transaction.t()), boolean(), binary() | nil} + def scan_chain(genesis_address, limit_address, fields, paging_address, db_path) do + filepath = ChainWriter.chain_path(db_path, genesis_address) + column_names = fields_to_column_names(fields) + + case File.open(filepath, [:binary, :read]) do + {:ok, fd} -> + if paging_address do + case ChainIndex.get_tx_entry(paging_address, db_path) do + {:ok, %{offset: offset, size: size}} -> + :file.position(fd, offset + size) + do_scan_chain(fd, column_names, limit_address) + + {:error, :not_exists} -> + {[], false, ""} + end + else + do_scan_chain(fd, column_names, limit_address) + end + + {:error, _} -> + {[], false, nil} + end + end + + defp do_scan_chain(fd, fields, limit_address, acc \\ []) do case :file.read(fd, 8) do {:ok, <>} -> if length(acc) == @page_size do @@ -158,7 +194,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do if tx.address == limit_address do {Enum.reverse([tx | acc]), false, nil} else - scan_chain(fd, fields, limit_address, [tx | acc]) + do_scan_chain(fd, fields, limit_address, [tx | acc]) end end diff --git a/lib/archethic/transaction_chain.ex b/lib/archethic/transaction_chain.ex index dc9eb218b..7c76fb86d 100644 --- a/lib/archethic/transaction_chain.ex +++ b/lib/archethic/transaction_chain.ex @@ -604,7 +604,7 @@ defmodule Archethic.TransactionChain do Stream the transactions from a chain """ @spec stream(binary(), list()) :: Enumerable.t() | list(Transaction.t()) - def stream(address, fields) do + def stream(address, fields \\ []) do Stream.resource( fn -> DB.get_transaction_chain(address, fields, []) end, fn @@ -717,17 +717,8 @@ defmodule Archethic.TransactionChain do |> List.first() end - # Get transaction chain size to calculate timeout - chain_size = - case Archethic.get_transaction_chain_length(address) do - {:ok, chain_size} -> - chain_size - - _ -> - 1 - end - - timeout = Message.get_max_timeout() + Message.get_max_timeout() * chain_size + # We got transactions by batch of 10 transactions + timeout = Message.get_max_timeout() + Message.get_max_timeout() * 10 case P2P.quorum_read( nodes, @@ -889,4 +880,23 @@ defmodule Archethic.TransactionChain do def fetch_chain_db({chain, true, paging_address}, acc) do fetch_chain_db(get(paging_address, [], paging_state: paging_address), acc ++ chain) end + + @spec scan_chain(genesis_address :: binary(), limit_address :: binary(), fields :: list()) :: + Enumerable.t() + def scan_chain(genesis_address, limit_address, fields \\ []) do + Stream.resource( + fn -> DB.scan_chain(genesis_address, limit_address, fields, nil) end, + fn + {transactions, true, paging_state} -> + {transactions, DB.scan_chain(genesis_address, limit_address, fields, paging_state)} + + {transactions, false, _} -> + {transactions, :eof} + + :eof -> + {:halt, nil} + end, + fn _ -> :ok end + ) + end end diff --git a/test/archethic/db/embedded_impl_test.exs b/test/archethic/db/embedded_impl_test.exs index 01d6a95e6..5d1a323d2 100644 --- a/test/archethic/db/embedded_impl_test.exs +++ b/test/archethic/db/embedded_impl_test.exs @@ -244,6 +244,56 @@ defmodule Archethic.DB.EmbeddedTest do end end + describe "scan_chain/4" do + test "should return the list of all the transactions until the one given" do + transactions = + Enum.map(1..20, fn i -> + TransactionFactory.create_valid_transaction([], + index: i, + timestamp: DateTime.utc_now() |> DateTime.add(i * 60) + ) + end) + + genesis_address = transactions |> List.first() |> Transaction.previous_address() + :ok = EmbeddedImpl.write_transaction_chain(transactions) + + assert {txs, false, nil} = + EmbeddedImpl.scan_chain(genesis_address, Enum.at(transactions, 2).address) + + assert Enum.take(transactions, 3) == txs + end + + test "should return a page and its paging state" do + transactions = + Enum.map(1..20, fn i -> + TransactionFactory.create_valid_transaction([], + index: i, + timestamp: DateTime.utc_now() |> DateTime.add(i * 60) + ) + end) + + EmbeddedImpl.write_transaction_chain(transactions) + genesis_address = transactions |> List.first() |> Transaction.previous_address() + + {page, true, paging_state} = + EmbeddedImpl.scan_chain(genesis_address, Enum.at(transactions, 12).address) + + assert length(page) == 10 + assert page == Enum.take(transactions, 10) + assert paging_state == Enum.at(transactions, 9).address + + {page2, false, nil} = + EmbeddedImpl.scan_chain( + genesis_address, + Enum.at(transactions, 12).address, + [], + paging_state + ) + + assert length(page2) == 3 + end + end + describe "chain_size/1" do test "should return 0 when there are not transactions" do assert 0 == EmbeddedImpl.chain_size(:crypto.strong_rand_bytes(32)) diff --git a/test/archethic_test.exs b/test/archethic_test.exs index 40f6e0671..90803406d 100644 --- a/test/archethic_test.exs +++ b/test/archethic_test.exs @@ -10,12 +10,12 @@ defmodule ArchethicTest do alias Archethic.P2P alias Archethic.P2P.Message.Balance alias Archethic.P2P.Message.GetBalance + alias Archethic.P2P.Message.GetFirstAddress alias Archethic.P2P.Message.GetLastTransactionAddress alias Archethic.P2P.Message.GetTransaction alias Archethic.P2P.Message.GetTransactionChain alias Archethic.P2P.Message.GetTransactionChainLength alias Archethic.P2P.Message.GetTransactionInputs - alias Archethic.P2P.Message.GetFirstAddress alias Archethic.P2P.Message.FirstAddress alias Archethic.P2P.Message.LastTransactionAddress @@ -300,6 +300,9 @@ defmodule ArchethicTest do MockClient |> stub(:send_message, fn + _, %GetFirstAddress{address: "@Alice2"}, _ -> + {:ok, %FirstAddress{address: "@Alice0"}} + _, %GetTransactionChain{}, _ -> {:ok, %TransactionList{ @@ -309,12 +312,6 @@ defmodule ArchethicTest do %Transaction{address: "@Alice2"} ] }} - - _, %GetTransactionChainLength{}, _ -> - %TransactionChainLength{length: 3} - - _, %GetFirstAddress{}, _ -> - {:ok, %NotFound{}} end) assert {:ok, @@ -348,116 +345,40 @@ defmodule ArchethicTest do }) MockDB - |> stub(:get_last_chain_address, fn _address -> - {"@Alice5", DateTime.utc_now()} - end) - - MockDB - |> stub(:get_transaction_chain, fn _address, _, _ -> + |> stub(:scan_chain, fn "@Alice0", _, _, _ -> {[ - %Transaction{address: "@Alice0"}, %Transaction{address: "@Alice1"}, %Transaction{address: "@Alice2"}, %Transaction{address: "@Alice3"}, - %Transaction{address: "@Alice4"}, - %Transaction{address: "@Alice5"} + %Transaction{address: "@Alice4"} ], false, nil} end) MockClient |> stub(:send_message, fn - _, %GetTransactionChain{address: "@Alice2", paging_state: "@Alice5"}, _ -> + _, %GetFirstAddress{address: "@Alice6"}, _ -> + {:ok, %FirstAddress{address: "@Alice0"}} + + _, %GetTransactionChain{address: "@Alice6", paging_state: "@Alice4"}, _ -> {:ok, %TransactionList{ transactions: [ + %Transaction{address: "@Alice5"}, %Transaction{address: "@Alice6"}, %Transaction{address: "@Alice7"} ] }} - - _, %GetTransactionChainLength{}, _ -> - %TransactionChainLength{length: 2} - - _, %GetFirstAddress{}, _ -> - {:ok, %FirstAddress{address: "@Alice0"}} end) assert {:ok, [ - %Transaction{address: "@Alice0"}, %Transaction{address: "@Alice1"}, %Transaction{address: "@Alice2"}, %Transaction{address: "@Alice3"}, %Transaction{address: "@Alice4"}, %Transaction{address: "@Alice5"}, - %Transaction{address: "@Alice6"}, - %Transaction{address: "@Alice7"} - ]} = Archethic.get_transaction_chain("@Alice2") - end - - test "should get_transaction_chain from network when GetFirstAddress fails" do - P2P.add_and_connect_node(%Node{ - ip: {127, 0, 0, 1}, - port: 3000, - first_public_key: Crypto.last_node_public_key(), - last_public_key: Crypto.last_node_public_key(), - network_patch: "AAA", - geo_patch: "AAA" - }) - - P2P.add_and_connect_node(%Node{ - ip: {127, 0, 0, 1}, - port: 3000, - first_public_key: "key1", - last_public_key: "key1", - network_patch: "AAA", - geo_patch: "AAA", - available?: true, - authorized?: true, - authorization_date: DateTime.utc_now() - }) - - MockClient - |> expect(:send_message, fn _, %GetFirstAddress{}, _ -> - {:ok, %NotFound{}} - end) - - MockDB - |> stub(:get_last_chain_address, fn address -> - address - end) - - MockClient - |> stub(:send_message, fn - _, %GetTransactionChain{address: "@Alice2", paging_state: nil}, _ -> - {:ok, - %TransactionList{ - transactions: [ - %Transaction{address: "@Alice0"}, - %Transaction{address: "@Alice1"}, - %Transaction{address: "@Alice2"}, - %Transaction{address: "@Alice3"}, - %Transaction{address: "@Alice4"}, - %Transaction{address: "@Alice5"} - ] - }} - - _, %GetTransactionChainLength{}, _ -> - %TransactionChainLength{length: 6} - - _, %GetFirstAddress{}, _ -> - {:ok, %NotFound{}} - end) - - assert {:ok, - [ - %Transaction{address: "@Alice0"}, - %Transaction{address: "@Alice1"}, - %Transaction{address: "@Alice2"}, - %Transaction{address: "@Alice3"}, - %Transaction{address: "@Alice4"}, - %Transaction{address: "@Alice5"} - ]} = Archethic.get_transaction_chain("@Alice2") + %Transaction{address: "@Alice6"} + ]} = Archethic.get_transaction_chain("@Alice6") end end diff --git a/test/support/template.ex b/test/support/template.ex index a91cfd3c0..5f11ce169 100644 --- a/test/support/template.ex +++ b/test/support/template.ex @@ -38,6 +38,7 @@ defmodule ArchethicCase do |> stub(:write_transaction_chain, fn _ -> :ok end) |> stub(:get_transaction, fn _, _ -> {:error, :transaction_not_exists} end) |> stub(:get_transaction_chain, fn _, _, _ -> {[], false, nil} end) + |> stub(:scan_chain, fn _, _, _, _ -> {[], false, nil} end) |> stub(:list_last_transaction_addresses, fn -> [] end) |> stub(:add_last_transaction_address, fn _, _, _ -> :ok end) |> stub(:get_last_chain_address, fn addr -> {addr, DateTime.utc_now()} end)