Skip to content

Commit

Permalink
Fix replication chain ingestion (#821)
Browse files Browse the repository at this point in the history
* Ingest transaction chain in replication
* Remove unused write_tranaction_chain function
  • Loading branch information
Neylix committed Jan 11, 2023
1 parent 7354d91 commit eaf9ed1
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 188 deletions.
1 change: 0 additions & 1 deletion lib/archethic/db.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ defmodule Archethic.DB do
@callback write_beacon_summary(Summary.t()) :: :ok
@callback clear_beacon_summaries() :: :ok
@callback write_beacon_summaries_aggregate(SummaryAggregate.t()) :: :ok
@callback write_transaction_chain(Enumerable.t()) :: :ok
@callback list_transactions(fields :: list()) :: Enumerable.t()
@callback list_io_transactions(fields :: list()) :: Enumerable.t()
@callback add_last_transaction_address(binary(), binary(), DateTime.t()) :: :ok
Expand Down
39 changes: 0 additions & 39 deletions lib/archethic/db/embedded_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,45 +44,6 @@ defmodule Archethic.DB.EmbeddedImpl do
end
end

@doc """
Write the transaction chain through the a chain writer which will
append the transactions to the chain's file
If a transaction already exists it will be discarded
Indexes will then be filled with the relative transactions
"""
@spec write_transaction_chain(list(Transaction.t())) :: :ok
def write_transaction_chain(chain) do
sorted_chain = Enum.sort_by(chain, & &1.validation_stamp.timestamp, {:asc, DateTime})

previous_address =
List.first(sorted_chain)
|> Transaction.previous_address()

genesis_address =
case ChainIndex.get_tx_entry(previous_address, db_path()) do
{:ok, %{genesis_address: genesis_address}} ->
genesis_address

_ ->
previous_address
end

do_write_transaction_chain(genesis_address, sorted_chain)
end

defp do_write_transaction_chain(genesis_address, sorted_chain) do
Enum.each(sorted_chain, fn tx ->
unless ChainIndex.transaction_exists?(tx.address, db_path()) do
ChainWriter.append_transaction(genesis_address, tx)

# Delete IO transaction if it exists as it is now stored as a chain
delete_io_transaction(tx.address)
end
end)
end

@doc """
Write a single transaction and append it to its chain
"""
Expand Down
16 changes: 15 additions & 1 deletion lib/archethic/replication.ex
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,21 @@ defmodule Archethic.Replication do
tx
|> stream_previous_chain(download_nodes)
|> Stream.reject(&Enum.empty?/1)
|> Stream.each(&TransactionChain.write/1)
|> Stream.flat_map(& &1)
|> Stream.each(fn tx = %Transaction{address: address} ->
TransactionChain.write_transaction(tx)

# There is some case where a transaction is not replicated while it should
# because of some latency or network issue. So when we replicate a past chain
# we also ingest the transaction if we are storage node of it

storage_node? =
address
|> Election.chain_storage_nodes(download_nodes)
|> Utils.key_in_node_list?(Crypto.first_node_public_key())

if storage_node?, do: ingest_transaction(tx, false)
end)
|> Stream.run()

TransactionChain.write_transaction(tx)
Expand Down
21 changes: 0 additions & 21 deletions lib/archethic/transaction_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -197,27 +197,6 @@ defmodule Archethic.TransactionChain do
)
end

@doc """
Persist a new transaction chain
"""
@spec write(Enumerable.t()) :: :ok
def write(chain) do
sorted_chain = Enum.sort_by(chain, & &1.validation_stamp.timestamp, {:desc, DateTime})

%Transaction{
address: tx_address,
type: tx_type
} = Enum.at(sorted_chain, 0)

DB.write_transaction_chain(sorted_chain)
Enum.each(sorted_chain, &KOLedger.remove_transaction(&1.address))

Logger.info("Transaction Chain stored",
transaction_address: Base.encode16(tx_address),
transaction_type: tx_type
)
end

@doc """
Write an invalid transaction
"""
Expand Down
22 changes: 7 additions & 15 deletions test/archethic/bootstrap_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ defmodule Archethic.BootstrapTest do
start_supervised!({NodeRenewalScheduler, interval: "0 * * * * * *"})

MockDB
|> stub(:write_transaction_chain, fn _ -> :ok end)
|> stub(:write_transaction, fn _, _ -> :ok end)

MockDB
|> stub(:list_transactions_by_type, fn :mint_rewards, [:address, :type] ->
Expand Down Expand Up @@ -262,7 +262,7 @@ defmodule Archethic.BootstrapTest do
}

validated_tx = %{tx | validation_stamp: stamp}
:ok = TransactionChain.write([validated_tx])
:ok = TransactionChain.write_transaction(validated_tx)
:ok = Replication.ingest_transaction(validated_tx, false)

{:ok, %Ok{}}
Expand Down Expand Up @@ -560,13 +560,9 @@ defmodule Archethic.BootstrapTest do
|> expect(:get_transaction, fn ^addr3, _ ->
{:error, :transaction_not_exists}
end)
|> expect(:write_transaction_chain, fn
txn_list ->
# to know this fx executed or not we use send
send(me, {:write_transaction_chain, txn_list})
:ok
end)
|> expect(:write_transaction, fn _tx, _ ->
|> stub(:write_transaction, fn tx, _ ->
# to know this fx executed or not we use send
send(me, {:write_transaction, tx.address})
:ok
end)

Expand Down Expand Up @@ -616,12 +612,8 @@ defmodule Archethic.BootstrapTest do
# &TransactionChain.write/1 <- stream_remotely(addr3,addr2) <- get_last_address(locally)
# |
# write_transaction(tx4) -> ingest txn4
assert_receive({:write_transaction_chain, txn_list})

assert [] ==
Enum.reduce(txn_list, [addr4, addr3], fn tx, acc ->
acc -- [tx.address]
end)
assert_receive({:write_transaction, ^addr3})
assert_receive({:write_transaction, ^addr4})
end

defp p2p_context() do
Expand Down
Loading

0 comments on commit eaf9ed1

Please sign in to comment.