diff --git a/lib/archethic/replication.ex b/lib/archethic/replication.ex index 42aa384f7..892bbdd40 100644 --- a/lib/archethic/replication.ex +++ b/lib/archethic/replication.ex @@ -143,8 +143,6 @@ defmodule Archethic.Replication do # Stream the insertion of the chain tx |> stream_previous_chain(download_nodes) - |> Stream.reject(&Enum.empty?/1) - |> Stream.flat_map(& &1) |> Stream.each(fn tx = %Transaction{address: address} -> TransactionChain.write_transaction(tx) @@ -346,22 +344,19 @@ defmodule Archethic.Replication do previous_address = Transaction.previous_address(tx) if Transaction.network_type?(type) do - stream_network_tx_chain(tx, download_nodes) + stream_network_tx_chain(previous_address, download_nodes) else TransactionContext.stream_transaction_chain(previous_address, download_nodes) end end - defp stream_network_tx_chain(tx = %Transaction{}, download_nodes) do - previous_address = Transaction.previous_address(tx) - + defp stream_network_tx_chain(previous_address, download_nodes) do if TransactionChain.transaction_exists?(previous_address) do [] else Logger.debug( "Try to fetch network transaction chain from remote nodes (possibility of an orphan state)", - transaction_address: Base.encode16(previous_address), - transaction_type: tx.type + transaction_address: Base.encode16(previous_address) ) TransactionContext.stream_transaction_chain(previous_address, download_nodes) diff --git a/lib/archethic/replication/transaction_context.ex b/lib/archethic/replication/transaction_context.ex index 958ae313c..64732bf20 100644 --- a/lib/archethic/replication/transaction_context.ex +++ b/lib/archethic/replication/transaction_context.ex @@ -34,19 +34,16 @@ defmodule Archethic.Replication.TransactionContext do @spec stream_transaction_chain(address :: Crypto.versioned_hash(), list(Node.t())) :: Enumerable.t() | list(Transaction.t()) def stream_transaction_chain(address, node_list) when is_binary(address) do - storage_nodes = Election.chain_storage_nodes(address, node_list) - paging_address = TransactionChain.get_last_local_address(address) - - case storage_nodes do - [] -> - [] - - _ -> - if paging_address != address do - TransactionChain.stream_remotely(address, storage_nodes, paging_address) - else - [] - end + with storage_nodes <- Election.chain_storage_nodes(address, node_list), + {:ok, genesis_address} <- + TransactionChain.fetch_genesis_address_remotely(address, storage_nodes), + true <- address != genesis_address, + paging_address <- TransactionChain.get_last_stored_address(genesis_address), + true <- paging_address != address do + TransactionChain.stream_remotely(address, storage_nodes, paging_address) + |> Stream.take_while(&(Transaction.previous_address(&1) != address)) + else + _ -> [] end end diff --git a/lib/archethic/transaction_chain.ex b/lib/archethic/transaction_chain.ex index 140c8f870..37bdc1699 100644 --- a/lib/archethic/transaction_chain.ex +++ b/lib/archethic/transaction_chain.ex @@ -741,10 +741,10 @@ defmodule Archethic.TransactionChain do defp do_stream_chain(nodes, address, paging_state, size) do case do_fetch_transaction_chain(nodes, address, paging_state) do {transactions, false, _} -> - {[transactions], {:end, size + length(transactions)}} + {transactions, {:end, size + length(transactions)}} {transactions, true, paging_state} -> - {[transactions], {address, paging_state, size + length(transactions)}} + {transactions, {address, paging_state, size + length(transactions)}} end end @@ -970,24 +970,11 @@ defmodule Archethic.TransactionChain do @doc """ Retrieve the last transaction address for a chain stored locally - It queries the the network for genesis address """ - @spec get_last_local_address(address :: binary()) :: binary() | nil - def get_last_local_address(address) when is_binary(address) do - case fetch_genesis_address_remotely(address) do - {:ok, genesis_address} -> - last_stored_address = get_last_stored_address(genesis_address) - - if last_stored_address == genesis_address, do: nil, else: last_stored_address - - _ -> - nil - end - end - - defp get_last_stored_address(genesis_address) do + @spec get_last_stored_address(genesis_address :: binary()) :: binary() | nil + def get_last_stored_address(genesis_address) do list_chain_addresses(genesis_address) - |> Enum.reduce_while(genesis_address, fn {address, _}, acc -> + |> Enum.reduce_while(nil, fn {address, _}, acc -> if transaction_exists?(address), do: {:cont, address}, else: {:halt, acc} end) end diff --git a/test/archethic/replication/transaction_context_test.exs b/test/archethic/replication/transaction_context_test.exs index d854f298c..6b1121db2 100644 --- a/test/archethic/replication/transaction_context_test.exs +++ b/test/archethic/replication/transaction_context_test.exs @@ -6,8 +6,6 @@ defmodule Archethic.Replication.TransactionContextTest do alias Archethic.Crypto alias Archethic.P2P - alias Archethic.P2P.Message.GetTransactionChainLength - alias Archethic.P2P.Message.TransactionChainLength alias Archethic.P2P.Message.GetTransaction alias Archethic.P2P.Message.GetTransactionChain alias Archethic.P2P.Message.GetTransactionInputs @@ -25,8 +23,8 @@ defmodule Archethic.Replication.TransactionContextTest do alias Archethic.TransactionChain.TransactionInput alias Archethic.TransactionChain.VersionedTransactionInput alias Archethic.P2P.Message.GetGenesisAddress - alias Archethic.P2P.Message.NotFound - # alias Archethic.P2P.Message.GenesisAddress + alias Archethic.P2P.Message.GenesisAddress + import Mox test "fetch_transaction/1 should retrieve the transaction" do @@ -52,16 +50,38 @@ defmodule Archethic.Replication.TransactionContextTest do end test "stream_transaction_chain/1 should retrieve the previous transaction chain" do + pub1 = <<0::8, 0::8, :crypto.strong_rand_bytes(32)::bitstring>> + pub2 = <<0::8, 0::8, :crypto.strong_rand_bytes(32)::bitstring>> + + addr1 = Crypto.derive_address(pub1) + addr2 = Crypto.derive_address(pub2) + + MockDB + |> stub(:list_chain_addresses, fn _ -> + [{addr1, DateTime.utc_now()}, {addr2, DateTime.utc_now()}] + end) + |> stub(:transaction_exists?, fn + ^addr1, _ -> true + _, _ -> false + end) + MockClient |> stub(:send_message, fn - _, %GetTransactionChain{}, _ -> - {:ok, %TransactionList{transactions: [%Transaction{}]}} - - _, %GetTransactionChainLength{}, _ -> - %TransactionChainLength{length: 1} + _, %GetTransactionChain{address: ^addr2, paging_state: ^addr1}, _ -> + {:ok, + %TransactionList{ + transactions: [ + %Transaction{ + previous_public_key: pub1 + }, + %Transaction{ + previous_public_key: pub2 + } + ] + }} _, %GetGenesisAddress{}, _ -> - {:ok, %NotFound{}} + {:ok, %GenesisAddress{address: "@Alice0"}} end) P2P.add_and_connect_node(%Node{ @@ -76,12 +96,15 @@ defmodule Archethic.Replication.TransactionContextTest do authorization_date: DateTime.utc_now() }) - assert 1 = - TransactionContext.stream_transaction_chain( - "@Alice1", - P2P.authorized_and_available_nodes() - ) - |> Enum.count() + chain = + TransactionContext.stream_transaction_chain(addr2, P2P.authorized_and_available_nodes()) + |> Enum.to_list() + + assert [ + %Transaction{ + previous_public_key: ^pub1 + } + ] = chain end test "fetch_transaction_inputs/2 should retrieve the inputs of a transaction at a given date" do diff --git a/test/archethic/transaction_chain_test.exs b/test/archethic/transaction_chain_test.exs index 2f89a310e..f130c4c15 100644 --- a/test/archethic/transaction_chain_test.exs +++ b/test/archethic/transaction_chain_test.exs @@ -233,15 +233,11 @@ defmodule Archethic.TransactionChainTest do %Transaction{} ] }} - - _, %GetTransactionChainLength{}, _ -> - %TransactionChainLength{length: 1} end) assert 1 = TransactionChain.stream_remotely("Alice1", nodes) |> Enum.to_list() - |> List.first() |> length() end @@ -314,7 +310,6 @@ defmodule Archethic.TransactionChainTest do assert 5 = TransactionChain.stream_remotely("Alice1", nodes) |> Enum.to_list() - |> List.first() |> length() end end