diff --git a/lib/archethic/db.ex b/lib/archethic/db.ex index ab7859863b..b638bd9aac 100644 --- a/lib/archethic/db.ex +++ b/lib/archethic/db.ex @@ -42,7 +42,7 @@ defmodule Archethic.DB do @callback list_addresses_by_type(Transaction.transaction_type()) :: Enumerable.t() | list(binary()) @callback list_chain_addresses(binary()) :: - Enumerable.t() | list({binary(), non_neg_integer()}) + Enumerable.t() | list({binary(), DateTime.t()}) @callback get_last_chain_address(binary()) :: {binary(), DateTime.t()} @callback get_last_chain_address(binary(), DateTime.t()) :: {binary(), DateTime.t()} diff --git a/lib/archethic/db/embedded_impl.ex b/lib/archethic/db/embedded_impl.ex index d6919fed7f..0136e78383 100644 --- a/lib/archethic/db/embedded_impl.ex +++ b/lib/archethic/db/embedded_impl.ex @@ -216,7 +216,7 @@ defmodule Archethic.DB.EmbeddedImpl do Stream all the addresses from the Genesis address(following it). """ @spec list_chain_addresses(binary()) :: - Enumerable.t() | list({binary(), non_neg_integer()}) + Enumerable.t() | list({binary(), DateTime.t()}) def list_chain_addresses(address) when is_binary(address) do ChainIndex.list_chain_addresses(address, db_path()) end @@ -249,16 +249,16 @@ defmodule Archethic.DB.EmbeddedImpl do end @doc """ - Reference a last address from a previous address + Reference a last address from a genesis address """ @spec add_last_transaction_address( - previous_address :: binary(), + genesis_address :: binary(), address :: binary(), tx_time :: DateTime.t() ) :: :ok - def add_last_transaction_address(previous_address, address, date = %DateTime{}) - when is_binary(previous_address) and is_binary(address) do - ChainIndex.set_last_chain_address(previous_address, address, date, db_path()) + def add_last_transaction_address(genesis_address, address, date = %DateTime{}) + when is_binary(genesis_address) and is_binary(address) do + ChainIndex.set_last_chain_address(genesis_address, address, date, db_path()) end @doc """ diff --git a/lib/archethic/db/embedded_impl/chain_index.ex b/lib/archethic/db/embedded_impl/chain_index.ex index 4d042c40d8..82375d4e18 100644 --- a/lib/archethic/db/embedded_impl/chain_index.ex +++ b/lib/archethic/db/embedded_impl/chain_index.ex @@ -304,7 +304,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do Stream all the transaction addresses from genesis_address-address. """ @spec list_chain_addresses(binary(), String.t()) :: - Enumerable.t() | list({binary(), non_neg_integer()}) + Enumerable.t() | list({binary(), DateTime.t()}) def list_chain_addresses(address, db_path) when is_binary(address) do filepath = chain_addresses_path(db_path, address) @@ -321,7 +321,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do {:ok, hash} <- :file.read(fd, hash_size) do address = <> # return tuple of address and timestamp - {[{address, timestamp}], {:ok, fd}} + {[{address, DateTime.from_unix!(timestamp, :millisecond)}], {:ok, fd}} else :eof -> :file.close(fd) diff --git a/lib/archethic/p2p/message.ex b/lib/archethic/p2p/message.ex index e3aaec18d8..e3005a70e8 100644 --- a/lib/archethic/p2p/message.ex +++ b/lib/archethic/p2p/message.ex @@ -45,6 +45,7 @@ defmodule Archethic.P2P.Message do alias __MODULE__.{ AcknowledgeStorage, AddMiningContext, + AddressList, Balance, BeaconSummaryList, BeaconUpdate, @@ -65,6 +66,7 @@ defmodule Archethic.P2P.Message do GetFirstPublicKey, GetLastTransaction, GetLastTransactionAddress, + GetNextAddresses, GetP2PView, GetStorageNonce, GetTransaction, @@ -141,6 +143,7 @@ defmodule Archethic.P2P.Message do | GetBeaconSummariesAggregate.t() | NotifyPreviousChain.t() | ShardRepair.t() + | GetNextAddresses.t() @type response :: Ok.t() @@ -165,6 +168,7 @@ defmodule Archethic.P2P.Message do | FirstAddress.t() | ReplicationError.t() | SummaryAggregate.t() + | AddressList.t() @floor_upload_speed Application.compile_env!(:archethic, [__MODULE__, :floor_upload_speed]) @content_max_size Application.compile_env!(:archethic, :transaction_data_content_max_size) @@ -434,6 +438,14 @@ defmodule Archethic.P2P.Message do <<34::8, address::binary>> end + def encode(msg = %GetNextAddresses{}) do + <<35::8, GetNextAddresses.serialize(msg)::bitstring>> + end + + def encode(msg = %AddressList{}) do + <<229::8, AddressList.serialize(msg)::bitstring>> + end + def encode(msg = %ShardRepair{}) do <<230::8, ShardRepair.serialize(msg)::bitstring>> end @@ -972,6 +984,14 @@ defmodule Archethic.P2P.Message do {%NotifyPreviousChain{address: address}, rest} end + def decode(<<35::8, rest::bitstring>>) do + GetNextAddresses.deserialize(rest) + end + + def decode(<<229::8, rest::bitstring>>) do + AddressList.deserialize(rest) + end + def decode(<<230::8, rest::bitstring>>) do ShardRepair.deserialize(rest) end @@ -1873,6 +1893,23 @@ defmodule Archethic.P2P.Message do %Ok{} end + def process(%GetNextAddresses{address: address}, _) do + case TransactionChain.get_transaction(address, validation_stamp: [:timestamp]) do + {:ok, %Transaction{validation_stamp: %ValidationStamp{timestamp: address_timestamp}}} -> + addresses = + TransactionChain.get_genesis_address(address) + |> TransactionChain.list_chain_addresses() + |> Enum.filter(fn {_address, timestamp} -> + DateTime.compare(timestamp, address_timestamp) == :gt + end) + + %AddressList{addresses: addresses} + + _ -> + %AddressList{addresses: []} + end + end + defp process_replication_chain(tx, replying_node_public_key) do Task.Supervisor.start_child(TaskSupervisor, fn -> response = diff --git a/lib/archethic/p2p/message/address_list.ex b/lib/archethic/p2p/message/address_list.ex new file mode 100644 index 0000000000..c6bceb0bfb --- /dev/null +++ b/lib/archethic/p2p/message/address_list.ex @@ -0,0 +1,63 @@ +defmodule Archethic.P2P.Message.AddressList do + @moduledoc """ + Inform a shard to start repair. + """ + @enforce_keys [:addresses] + defstruct [:addresses] + + alias Archethic.Crypto + + alias Archethic.Utils + alias Archethic.Utils.VarInt + + @type t :: %__MODULE__{addresses: list(Crypto.prepended_hash())} + + @doc """ + Serialize AddressList Struct + + iex> %AddressList{ + ...> addresses: [<<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + ...> 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>>, + ...> <<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + ...> 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>>] + ...> } |> AddressList.serialize() + # VarInt + <<1, 2, + # Addresses + 0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251, + 0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>> + """ + def serialize(%__MODULE__{addresses: addresses}) do + <> + end + + @doc """ + Deserialize AddressList Struct + + iex> # VarInt + ...> <<1, 2, + ...> # Addresses + ...> 0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + ...> 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251, + ...> 0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + ...> 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>> + ...> |> AddressList.deserialize() + { + %AddressList{ + addresses: [<<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>>, + <<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>>] + }, ""} + + """ + def deserialize(bin) do + {addresses_length, rest} = VarInt.get_value(bin) + + {addresses, rest} = Utils.deserialize_addresses(rest, addresses_length, []) + + {%__MODULE__{addresses: addresses}, rest} + end +end diff --git a/lib/archethic/p2p/message/get_next_addresses.ex b/lib/archethic/p2p/message/get_next_addresses.ex new file mode 100644 index 0000000000..87e3e3c22b --- /dev/null +++ b/lib/archethic/p2p/message/get_next_addresses.ex @@ -0,0 +1,46 @@ +defmodule Archethic.P2P.Message.GetNextAddresses do + @moduledoc """ + Inform a shard to start repair. + """ + @enforce_keys [:address] + defstruct [:address] + + alias Archethic.Crypto + + alias Archethic.Utils + + @type t :: %__MODULE__{address: Crypto.prepended_hash()} + + @doc """ + Serialize GetNextAddresses Struct + + iex> %GetNextAddresses{ + ...> address: <<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + ...> 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>> + ...> } |> GetNextAddresses.serialize() + <<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>> + """ + def serialize(%__MODULE__{address: address}) do + <> + end + + @doc """ + Deserialize GetNextAddresses Struct + + iex> <<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + ...> 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>> + ...> |> GetNextAddresses.deserialize() + { + %GetNextAddresses{ + address: <<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172, + 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>> + }, ""} + + """ + def deserialize(bin) do + {address, rest} = Utils.deserialize_address(bin) + + {%__MODULE__{address: address}, rest} + end +end diff --git a/lib/archethic/self_repair/repair_worker.ex b/lib/archethic/self_repair/repair_worker.ex index 33ab1ec6d3..bfc7531e2f 100644 --- a/lib/archethic/self_repair/repair_worker.ex +++ b/lib/archethic/self_repair/repair_worker.ex @@ -2,6 +2,7 @@ defmodule Archethic.SelfRepair.RepairWorker do @moduledoc false alias Archethic.{ + Contracts, BeaconChain, Election, P2P, @@ -32,6 +33,9 @@ defmodule Archethic.SelfRepair.RepairWorker do address: Base.encode16(first_address) ) + # We get the authorized nodes before the last summary date as we are sure that they know + # the informations we need. Requesting current nodes may ask information to nodes in same repair + # process as we are here. authorized_nodes = DateTime.utc_now() |> BeaconChain.previous_summary_time() @@ -124,9 +128,14 @@ defmodule Archethic.SelfRepair.RepairWorker do with false <- TransactionChain.transaction_exists?(address), storage_nodes <- Election.chain_storage_nodes(address, authorized_nodes), {:ok, tx} <- TransactionChain.fetch_transaction_remotely(address, storage_nodes) do - if storage?, - do: Replication.validate_and_store_transaction_chain(tx, true, authorized_nodes), - else: Replication.validate_and_store_transaction(tx, true) + if storage? do + case Replication.validate_and_store_transaction_chain(tx, true, authorized_nodes) do + :ok -> update_last_address(address, authorized_nodes) + error -> error + end + else + Replication.validate_and_store_transaction(tx, true) + end else true -> Logger.debug("Notifier RepairWorker transaction already exists", @@ -139,4 +148,37 @@ defmodule Archethic.SelfRepair.RepairWorker do ) end end + + @doc """ + Request missing transaction addresses from last local address until last chain address + and add them in the DB + """ + def update_last_address(address, authorized_nodes) do + # As the node is storage node of this chain, it needs to know all the addresses of the chain until the last + # So we get the local last address and verify if it's the same as the last address of the chain + # by requesting the nodes which already know the last address + + {last_local_address, _timestamp} = TransactionChain.get_last_address(address) + storage_nodes = Election.storage_nodes(last_local_address, authorized_nodes) + + case TransactionChain.fetch_next_chain_addresses_remotely(last_local_address, storage_nodes) do + {:ok, []} -> + :ok + + {:ok, addresses} -> + genesis_address = TransactionChain.get_genesis_address(address) + + addresses + |> Enum.sort_by(fn {_address, timestamp} -> timestamp end) + |> Enum.each(fn {address, timestamp} -> + TransactionChain.register_last_address(genesis_address, address, timestamp) + end) + + # Stop potential previous smart contract + Contracts.stop_contract(address) + + _ -> + :ok + end + end end diff --git a/lib/archethic/transaction_chain.ex b/lib/archethic/transaction_chain.ex index 9b108960d3..34fb16a186 100644 --- a/lib/archethic/transaction_chain.ex +++ b/lib/archethic/transaction_chain.ex @@ -10,25 +10,27 @@ defmodule Archethic.TransactionChain do alias Archethic.Election alias Archethic.P2P - alias Archethic.P2P.Node alias Archethic.P2P.Message + alias Archethic.P2P.Node alias Archethic.P2P.Message.{ + AddressList, Error, - NotFound, - TransactionList, - UnspentOutputList, - TransactionInputList, - TransactionChainLength, - LastTransactionAddress, FirstAddress, - GetTransaction, GetFirstAddress, - GetUnspentOutputs, + GetLastTransactionAddress, + GetNextAddresses, + GetTransaction, GetTransactionChain, + GetTransactionChainLength, GetTransactionInputs, - GetLastTransactionAddress, - GetTransactionChainLength + GetUnspentOutputs, + LastTransactionAddress, + NotFound, + TransactionChainLength, + TransactionInputList, + TransactionList, + UnspentOutputList } alias __MODULE__.MemTables.KOLedger @@ -78,7 +80,7 @@ defmodule Archethic.TransactionChain do @doc """ Stream all the addresses in chronological belonging to a genesis address """ - @spec list_chain_addresses(binary()) :: Enumerable.t() | list({binary(), non_neg_integer()}) + @spec list_chain_addresses(binary()) :: Enumerable.t() | list({binary(), DateTime.t()}) defdelegate list_chain_addresses(genesis_address), to: DB @doc """ @@ -98,10 +100,10 @@ defmodule Archethic.TransactionChain do as: :get_last_chain_address @doc """ - Register a last address from a previous address at a given date + Register a last address from a genesis address at a given date """ @spec register_last_address(binary(), binary(), DateTime.t()) :: :ok - defdelegate register_last_address(previous_address, next_address, timestamp), + defdelegate register_last_address(genesis_address, next_address, timestamp), to: DB, as: :add_last_transaction_address @@ -599,6 +601,29 @@ defmodule Archethic.TransactionChain do end end + @doc """ + Request the chain addresses from paging address to last chain address + """ + @spec fetch_next_chain_addresses_remotely(Crypto.prepended_hash(), list(Node.t())) :: + {:ok, list(Crypto.prepended_hash())} | {:error, :network_issue} + def fetch_next_chain_addresses_remotely(address, nodes) do + conflict_resolver = fn results -> + Enum.sort_by(results, &length(&1.addresses), :desc) |> List.first() + end + + case P2P.quorum_read( + nodes, + %GetNextAddresses{address: address}, + conflict_resolver + ) do + {:ok, %AddressList{addresses: addresses}} -> + {:ok, addresses} + + {:error, :network_issue} = e -> + e + end + end + @doc """ Get a transaction summary from a transaction address """ diff --git a/lib/archethic_web/live/chains/node_shared_secrets_live.ex b/lib/archethic_web/live/chains/node_shared_secrets_live.ex index be99f34fc5..7aa9e5b2ca 100644 --- a/lib/archethic_web/live/chains/node_shared_secrets_live.ex +++ b/lib/archethic_web/live/chains/node_shared_secrets_live.ex @@ -153,7 +153,7 @@ defmodule ArchethicWeb.NodeSharedSecretsChainLive do display_data( addr, nb_authorized_nodes, - DateTime.from_unix(timestamp, :millisecond) |> elem(1) + timestamp ) end) |> Enum.reverse() diff --git a/lib/archethic_web/live/chains/reward_live.ex b/lib/archethic_web/live/chains/reward_live.ex index f46942d977..a6b41c1224 100644 --- a/lib/archethic_web/live/chains/reward_live.ex +++ b/lib/archethic_web/live/chains/reward_live.ex @@ -132,7 +132,7 @@ defmodule ArchethicWeb.RewardChainLive do display_data( addr, (TransactionChain.get_transaction(addr, [:type]) |> elem(1)).type, - DateTime.from_unix(timestamp, :millisecond) |> elem(1) + timestamp ) end) |> Enum.reverse() diff --git a/test/archethic/p2p/message/address_list_test.exs b/test/archethic/p2p/message/address_list_test.exs new file mode 100644 index 0000000000..ad2f699cf7 --- /dev/null +++ b/test/archethic/p2p/message/address_list_test.exs @@ -0,0 +1,7 @@ +defmodule Archethic.P2P.Message.AddressListTest do + @moduledoc false + use ExUnit.Case + + alias Archethic.P2P.Message.AddressList + doctest AddressList +end diff --git a/test/archethic/p2p/message/get_next_addresses_test.exs b/test/archethic/p2p/message/get_next_addresses_test.exs new file mode 100644 index 0000000000..9df5e01fae --- /dev/null +++ b/test/archethic/p2p/message/get_next_addresses_test.exs @@ -0,0 +1,7 @@ +defmodule Archethic.P2P.Message.GetNextAddressesTest do + @moduledoc false + use ExUnit.Case + + alias Archethic.P2P.Message.GetNextAddresses + doctest GetNextAddresses +end diff --git a/test/archethic/replication_test.exs b/test/archethic/replication_test.exs index a62e14be72..fa95916df8 100644 --- a/test/archethic/replication_test.exs +++ b/test/archethic/replication_test.exs @@ -255,7 +255,7 @@ defmodule Archethic.ReplicationTest do |> expect(:get_transaction, fn _, _ -> {:ok, %Transaction{validation_stamp: %ValidationStamp{timestamp: DateTime.utc_now()}}} end) - |> expect(:list_chain_addresses, fn _ -> [{"@Alice1", 0}] end) + |> expect(:list_chain_addresses, fn _ -> [{"@Alice1", DateTime.utc_now()}] end) MockClient |> stub(:send_message, fn _, diff --git a/test/archethic/self_repair/repair_worker_test.exs b/test/archethic/self_repair/repair_worker_test.exs index 3ef8130460..487520ae2d 100644 --- a/test/archethic/self_repair/repair_worker_test.exs +++ b/test/archethic/self_repair/repair_worker_test.exs @@ -4,12 +4,19 @@ defmodule Archethic.SelfRepair.RepairWorkerTest do alias Archethic.BeaconChain.SummaryTimer + alias Archethic.Crypto + alias Archethic.P2P + alias Archethic.P2P.Client.DefaultImpl alias Archethic.P2P.Node + alias Archethic.P2P.Message.GetNextAddresses alias Archethic.P2P.Message.GetTransaction alias Archethic.SelfRepair.RepairWorker + alias Archethic.TransactionChain.Transaction + alias Archethic.TransactionChain.Transaction.ValidationStamp + import Mox setup do @@ -106,4 +113,52 @@ defmodule Archethic.SelfRepair.RepairWorkerTest do task: _task_pid } = :sys.get_state(pid) end + + test "update_last_address/1 should request missing addresses and add them in DB" do + node = %Node{ + first_public_key: Crypto.first_node_public_key(), + last_public_key: Crypto.last_node_public_key(), + geo_patch: "AAA", + authorized?: true, + authorization_date: ~U[2022-11-27 00:00:00Z], + available?: true, + availability_history: <<1::1>> + } + + me = self() + + MockDB + |> expect(:get_last_chain_address, fn "Alice2" -> {"Alice2", ~U[2022-11-27 00:10:00Z]} end) + |> expect(:get_transaction, fn "Alice2", _ -> + {:ok, %Transaction{validation_stamp: %ValidationStamp{timestamp: ~U[2022-11-27 00:10:00Z]}}} + end) + |> expect(:get_first_chain_address, 2, fn "Alice2" -> "Alice0" end) + |> expect(:list_chain_addresses, fn "Alice0" -> + [ + {"Alice1", ~U[2022-11-27 00:09:00Z]}, + {"Alice2", ~U[2022-11-27 00:10:00Z]}, + {"Alice3", ~U[2022-11-27 00:11:00Z]}, + {"Alice4", ~U[2022-11-27 00:12:00Z]} + ] + end) + |> expect(:add_last_transaction_address, 2, fn + "Alice0", "Alice3", ~U[2022-11-27 00:11:00Z] -> + send(me, :add_alice3) + + "Alice0", "Alice4", ~U[2022-11-27 00:12:00Z] -> + send(me, :add_alice4) + end) + + MockClient + |> expect(:send_message, fn node, msg = %GetNextAddresses{address: "Alice2"}, timeout -> + send(me, :get_next_addresses) + DefaultImpl.send_message(node, msg, timeout) + end) + + RepairWorker.update_last_address("Alice2", [node]) + + assert_receive :get_next_addresses + assert_receive :add_alice3 + assert_receive :add_alice4 + end end diff --git a/test/archethic_web/live/rewards_live_test.exs b/test/archethic_web/live/rewards_live_test.exs index 639957eb72..4d64158eb1 100644 --- a/test/archethic_web/live/rewards_live_test.exs +++ b/test/archethic_web/live/rewards_live_test.exs @@ -37,7 +37,6 @@ defmodule ArchethicWeb.RewardsLiveTest do time = DateTime.utc_now() |> DateTime.add(3600 * index, :second) - |> DateTime.to_unix() {address, time} end)