Skip to content

Commit

Permalink
Rework stream_previous_chain in replication
Browse files Browse the repository at this point in the history
  • Loading branch information
Neylix authored and samuelmanzanera committed Jan 17, 2023
1 parent 7c3a777 commit e60aa85
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 60 deletions.
11 changes: 3 additions & 8 deletions lib/archethic/replication.ex
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@ defmodule Archethic.Replication do
# Stream the insertion of the chain
tx
|> stream_previous_chain(download_nodes)
|> Stream.reject(&Enum.empty?/1)
|> Stream.flat_map(& &1)
|> Stream.each(fn tx = %Transaction{address: address} ->
TransactionChain.write_transaction(tx)

Expand Down Expand Up @@ -346,22 +344,19 @@ defmodule Archethic.Replication do
previous_address = Transaction.previous_address(tx)

if Transaction.network_type?(type) do
stream_network_tx_chain(tx, download_nodes)
stream_network_tx_chain(previous_address, download_nodes)
else
TransactionContext.stream_transaction_chain(previous_address, download_nodes)
end
end

defp stream_network_tx_chain(tx = %Transaction{}, download_nodes) do
previous_address = Transaction.previous_address(tx)

defp stream_network_tx_chain(previous_address, download_nodes) do
if TransactionChain.transaction_exists?(previous_address) do
[]
else
Logger.debug(
"Try to fetch network transaction chain from remote nodes (possibility of an orphan state)",
transaction_address: Base.encode16(previous_address),
transaction_type: tx.type
transaction_address: Base.encode16(previous_address)
)

TransactionContext.stream_transaction_chain(previous_address, download_nodes)
Expand Down
23 changes: 10 additions & 13 deletions lib/archethic/replication/transaction_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,16 @@ defmodule Archethic.Replication.TransactionContext do
@spec stream_transaction_chain(address :: Crypto.versioned_hash(), list(Node.t())) ::
Enumerable.t() | list(Transaction.t())
def stream_transaction_chain(address, node_list) when is_binary(address) do
storage_nodes = Election.chain_storage_nodes(address, node_list)
paging_address = TransactionChain.get_last_local_address(address)

case storage_nodes do
[] ->
[]

_ ->
if paging_address != address do
TransactionChain.stream_remotely(address, storage_nodes, paging_address)
else
[]
end
with storage_nodes <- Election.chain_storage_nodes(address, node_list),
{:ok, genesis_address} <-
TransactionChain.fetch_genesis_address_remotely(address, storage_nodes),
true <- address != genesis_address,
paging_address <- TransactionChain.get_last_stored_address(genesis_address),
true <- paging_address != address do
TransactionChain.stream_remotely(address, storage_nodes, paging_address)
|> Stream.take_while(&(Transaction.previous_address(&1) != address))
else
_ -> []
end
end

Expand Down
23 changes: 5 additions & 18 deletions lib/archethic/transaction_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -741,10 +741,10 @@ defmodule Archethic.TransactionChain do
defp do_stream_chain(nodes, address, paging_state, size) do
case do_fetch_transaction_chain(nodes, address, paging_state) do
{transactions, false, _} ->
{[transactions], {:end, size + length(transactions)}}
{transactions, {:end, size + length(transactions)}}

{transactions, true, paging_state} ->
{[transactions], {address, paging_state, size + length(transactions)}}
{transactions, {address, paging_state, size + length(transactions)}}
end
end

Expand Down Expand Up @@ -970,24 +970,11 @@ defmodule Archethic.TransactionChain do

@doc """
Retrieve the last transaction address for a chain stored locally
It queries the the network for genesis address
"""
@spec get_last_local_address(address :: binary()) :: binary() | nil
def get_last_local_address(address) when is_binary(address) do
case fetch_genesis_address_remotely(address) do
{:ok, genesis_address} ->
last_stored_address = get_last_stored_address(genesis_address)

if last_stored_address == genesis_address, do: nil, else: last_stored_address

_ ->
nil
end
end

defp get_last_stored_address(genesis_address) do
@spec get_last_stored_address(genesis_address :: binary()) :: binary() | nil
def get_last_stored_address(genesis_address) do
list_chain_addresses(genesis_address)
|> Enum.reduce_while(genesis_address, fn {address, _}, acc ->
|> Enum.reduce_while(nil, fn {address, _}, acc ->
if transaction_exists?(address), do: {:cont, address}, else: {:halt, acc}
end)
end
Expand Down
55 changes: 39 additions & 16 deletions test/archethic/replication/transaction_context_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ defmodule Archethic.Replication.TransactionContextTest do
alias Archethic.Crypto

alias Archethic.P2P
alias Archethic.P2P.Message.GetTransactionChainLength
alias Archethic.P2P.Message.TransactionChainLength
alias Archethic.P2P.Message.GetTransaction
alias Archethic.P2P.Message.GetTransactionChain
alias Archethic.P2P.Message.GetTransactionInputs
Expand All @@ -25,8 +23,8 @@ defmodule Archethic.Replication.TransactionContextTest do
alias Archethic.TransactionChain.TransactionInput
alias Archethic.TransactionChain.VersionedTransactionInput
alias Archethic.P2P.Message.GetGenesisAddress
alias Archethic.P2P.Message.NotFound
# alias Archethic.P2P.Message.GenesisAddress
alias Archethic.P2P.Message.GenesisAddress

import Mox

test "fetch_transaction/1 should retrieve the transaction" do
Expand All @@ -52,16 +50,38 @@ defmodule Archethic.Replication.TransactionContextTest do
end

test "stream_transaction_chain/1 should retrieve the previous transaction chain" do
pub1 = <<0::8, 0::8, :crypto.strong_rand_bytes(32)::bitstring>>
pub2 = <<0::8, 0::8, :crypto.strong_rand_bytes(32)::bitstring>>

addr1 = Crypto.derive_address(pub1)
addr2 = Crypto.derive_address(pub2)

MockDB
|> stub(:list_chain_addresses, fn _ ->
[{addr1, DateTime.utc_now()}, {addr2, DateTime.utc_now()}]
end)
|> stub(:transaction_exists?, fn
^addr1, _ -> true
_, _ -> false
end)

MockClient
|> stub(:send_message, fn
_, %GetTransactionChain{}, _ ->
{:ok, %TransactionList{transactions: [%Transaction{}]}}

_, %GetTransactionChainLength{}, _ ->
%TransactionChainLength{length: 1}
_, %GetTransactionChain{address: ^addr2, paging_state: ^addr1}, _ ->
{:ok,
%TransactionList{
transactions: [
%Transaction{
previous_public_key: pub1
},
%Transaction{
previous_public_key: pub2
}
]
}}

_, %GetGenesisAddress{}, _ ->
{:ok, %NotFound{}}
{:ok, %GenesisAddress{address: "@Alice0"}}
end)

P2P.add_and_connect_node(%Node{
Expand All @@ -76,12 +96,15 @@ defmodule Archethic.Replication.TransactionContextTest do
authorization_date: DateTime.utc_now()
})

assert 1 =
TransactionContext.stream_transaction_chain(
"@Alice1",
P2P.authorized_and_available_nodes()
)
|> Enum.count()
chain =
TransactionContext.stream_transaction_chain(addr2, P2P.authorized_and_available_nodes())
|> Enum.to_list()

assert [
%Transaction{
previous_public_key: ^pub1
}
] = chain
end

test "fetch_transaction_inputs/2 should retrieve the inputs of a transaction at a given date" do
Expand Down
5 changes: 0 additions & 5 deletions test/archethic/transaction_chain_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -233,15 +233,11 @@ defmodule Archethic.TransactionChainTest do
%Transaction{}
]
}}

_, %GetTransactionChainLength{}, _ ->
%TransactionChainLength{length: 1}
end)

assert 1 =
TransactionChain.stream_remotely("Alice1", nodes)
|> Enum.to_list()
|> List.first()
|> length()
end

Expand Down Expand Up @@ -314,7 +310,6 @@ defmodule Archethic.TransactionChainTest do
assert 5 =
TransactionChain.stream_remotely("Alice1", nodes)
|> Enum.to_list()
|> List.first()
|> length()
end
end
Expand Down

0 comments on commit e60aa85

Please sign in to comment.