Skip to content

Commit

Permalink
Fix replication transaction chain ordering writing (#545)
Browse files Browse the repository at this point in the history
* Fix replication transaction chain ordering writing
* Add telemetry of full replication chain write
  • Loading branch information
samuelmanzanera committed Sep 9, 2022
1 parent 291ea2b commit ef14fef
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 21 deletions.
29 changes: 16 additions & 13 deletions lib/archethic/replication.ex
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,6 @@ defmodule Archethic.Replication do
Enum.to_list(inputs)
) do
:ok ->
# Store the previous and the new one, as being the reference for next transactions
:ok = TransactionChain.write(Enum.filter([previous_tx, tx], & &1))
:ok = ingest_transaction(tx)
PubSub.notify_new_transaction(address, type, timestamp)

Expand All @@ -108,17 +106,22 @@ defmodule Archethic.Replication do
%{role: :chain}
)

if previous_tx do
# Load the rest of the chain asynchronously
Task.start(fn ->
# Stream the insertion of the chain
previous_tx
|> stream_previous_chain()
|> Stream.reject(&Enum.empty?/1)
|> Stream.each(&TransactionChain.write/1)
|> Stream.run()
end)
end
# Stream the insertion of the chain
tx
|> stream_previous_chain()
|> Stream.reject(&Enum.empty?/1)
|> Stream.each(&TransactionChain.write/1)
|> Stream.run()

TransactionChain.write_transaction(tx)

:telemetry.execute(
[:archethic, :replication, :full_write],
%{
duration: System.monotonic_time() - start_time
},
%{role: :chain}
)

:ok

Expand Down
7 changes: 6 additions & 1 deletion lib/archethic/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,12 @@ defmodule Archethic.Telemetry do
),
distribution("archethic.replication.validation.duration",
unit: {:native, :second},
reporter_options: [buckets: [0.01, 0.025, 0.05, 0.1, 0.3, 0.5, 0.8, 1, 1.5]],
reporter_options: [buckets: [0.01, 0.025, 0.05, 0.1, 0.3, 0.5, 0.8, 1, 1.5, 2, 3]],
measurement: :duration
),
distribution("archethic.replication.validation.full_write",
unit: {:native, :second},
reporter_options: [buckets: [0.01, 0.025, 0.05, 0.1, 0.3, 0.5, 0.8, 1, 1.5, 3, 5, 10]],
measurement: :duration
),
distribution("archethic.db.duration",
Expand Down
6 changes: 3 additions & 3 deletions test/archethic/bootstrap/network_init_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ defmodule Archethic.Bootstrap.NetworkInitTest do
me = self()

MockDB
|> stub(:write_transaction_chain, fn [^tx] ->
|> stub(:write_transaction, fn ^tx ->
send(me, :write_transaction)
:ok
end)
Expand Down Expand Up @@ -261,7 +261,7 @@ defmodule Archethic.Bootstrap.NetworkInitTest do
me = self()

MockDB
|> expect(:write_transaction_chain, fn [tx] ->
|> expect(:write_transaction, fn tx ->
send(me, {:transaction, tx})
:ok
end)
Expand Down Expand Up @@ -413,7 +413,7 @@ defmodule Archethic.Bootstrap.NetworkInitTest do
me = self()

MockDB
|> expect(:write_transaction_chain, fn [tx] ->
|> expect(:write_transaction, fn tx ->
send(me, {:transaction, tx})
:ok
end)
Expand Down
6 changes: 5 additions & 1 deletion test/archethic/replication_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ defmodule Archethic.ReplicationTest do
tx = create_valid_transaction(unspent_outputs)

MockDB
|> expect(:write_transaction_chain, fn [^tx] ->
# |> stub(:write_transaction_chain, fn [^tx] ->
# send(me, :replicated)
# :ok
# end)
|> expect(:write_transaction, fn ^tx ->
send(me, :replicated)
:ok
end)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ defmodule Archethic.SelfRepair.Sync.TransactionHandlerTest do
end)

MockDB
|> stub(:write_transaction_chain, fn [^tx] ->
|> stub(:write_transaction, fn ^tx ->
send(me, :transaction_replicated)
:ok
end)
Expand Down
4 changes: 2 additions & 2 deletions test/archethic/self_repair/sync_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ defmodule Archethic.SelfRepair.SyncTest do
me = self()

MockDB
|> stub(:write_transaction_chain, fn [^tx] ->
|> stub(:write_transaction, fn ^tx ->
send(me, :storage)
:ok
end)
Expand Down Expand Up @@ -308,7 +308,7 @@ defmodule Archethic.SelfRepair.SyncTest do
me = self()

MockDB
|> stub(:write_transaction_chain, fn [^transfer_tx] ->
|> stub(:write_transaction, fn ^transfer_tx ->
send(me, :transaction_stored)
:ok
end)
Expand Down

0 comments on commit ef14fef

Please sign in to comment.