Skip to content

Commit

Permalink
Request missing chain addresses while new transaction stored
Browse files Browse the repository at this point in the history
  • Loading branch information
Neylix committed Nov 28, 2022
1 parent 4fd6e4e commit 497d951
Show file tree
Hide file tree
Showing 15 changed files with 311 additions and 30 deletions.
2 changes: 1 addition & 1 deletion lib/archethic/db.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ defmodule Archethic.DB do
@callback list_addresses_by_type(Transaction.transaction_type()) ::
Enumerable.t() | list(binary())
@callback list_chain_addresses(binary()) ::
Enumerable.t() | list({binary(), non_neg_integer()})
Enumerable.t() | list({binary(), DateTime.t()})

@callback get_last_chain_address(binary()) :: {binary(), DateTime.t()}
@callback get_last_chain_address(binary(), DateTime.t()) :: {binary(), DateTime.t()}
Expand Down
12 changes: 6 additions & 6 deletions lib/archethic/db/embedded_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ defmodule Archethic.DB.EmbeddedImpl do
Stream all the addresses from the Genesis address(following it).
"""
@spec list_chain_addresses(binary()) ::
Enumerable.t() | list({binary(), non_neg_integer()})
Enumerable.t() | list({binary(), DateTime.t()})
def list_chain_addresses(address) when is_binary(address) do
ChainIndex.list_chain_addresses(address, db_path())
end
Expand Down Expand Up @@ -249,16 +249,16 @@ defmodule Archethic.DB.EmbeddedImpl do
end

@doc """
Reference a last address from a previous address
Reference a last address from a genesis address
"""
@spec add_last_transaction_address(
previous_address :: binary(),
genesis_address :: binary(),
address :: binary(),
tx_time :: DateTime.t()
) :: :ok
def add_last_transaction_address(previous_address, address, date = %DateTime{})
when is_binary(previous_address) and is_binary(address) do
ChainIndex.set_last_chain_address(previous_address, address, date, db_path())
def add_last_transaction_address(genesis_address, address, date = %DateTime{})
when is_binary(genesis_address) and is_binary(address) do
ChainIndex.set_last_chain_address(genesis_address, address, date, db_path())
end

@doc """
Expand Down
4 changes: 2 additions & 2 deletions lib/archethic/db/embedded_impl/chain_index.ex
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
Stream all the transaction addresses from genesis_address-address.
"""
@spec list_chain_addresses(binary(), String.t()) ::
Enumerable.t() | list({binary(), non_neg_integer()})
Enumerable.t() | list({binary(), DateTime.t()})
def list_chain_addresses(address, db_path) when is_binary(address) do
filepath = chain_addresses_path(db_path, address)

Expand All @@ -321,7 +321,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
{:ok, hash} <- :file.read(fd, hash_size) do
address = <<curve_id::8, hash_id::8, hash::binary>>
# return tuple of address and timestamp
{[{address, timestamp}], {:ok, fd}}
{[{address, DateTime.from_unix!(timestamp, :millisecond)}], {:ok, fd}}
else
:eof ->
:file.close(fd)
Expand Down
37 changes: 37 additions & 0 deletions lib/archethic/p2p/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ defmodule Archethic.P2P.Message do
alias __MODULE__.{
AcknowledgeStorage,
AddMiningContext,
AddressList,
Balance,
BeaconSummaryList,
BeaconUpdate,
Expand All @@ -65,6 +66,7 @@ defmodule Archethic.P2P.Message do
GetFirstPublicKey,
GetLastTransaction,
GetLastTransactionAddress,
GetNextAddresses,
GetP2PView,
GetStorageNonce,
GetTransaction,
Expand Down Expand Up @@ -141,6 +143,7 @@ defmodule Archethic.P2P.Message do
| GetBeaconSummariesAggregate.t()
| NotifyPreviousChain.t()
| ShardRepair.t()
| GetNextAddresses.t()

@type response ::
Ok.t()
Expand All @@ -165,6 +168,7 @@ defmodule Archethic.P2P.Message do
| FirstAddress.t()
| ReplicationError.t()
| SummaryAggregate.t()
| AddressList.t()

@floor_upload_speed Application.compile_env!(:archethic, [__MODULE__, :floor_upload_speed])
@content_max_size Application.compile_env!(:archethic, :transaction_data_content_max_size)
Expand Down Expand Up @@ -434,6 +438,14 @@ defmodule Archethic.P2P.Message do
<<34::8, address::binary>>
end

def encode(msg = %GetNextAddresses{}) do
<<35::8, GetNextAddresses.serialize(msg)::bitstring>>
end

def encode(msg = %AddressList{}) do
<<229::8, AddressList.serialize(msg)::bitstring>>
end

def encode(msg = %ShardRepair{}) do
<<230::8, ShardRepair.serialize(msg)::bitstring>>
end
Expand Down Expand Up @@ -972,6 +984,14 @@ defmodule Archethic.P2P.Message do
{%NotifyPreviousChain{address: address}, rest}
end

def decode(<<35::8, rest::bitstring>>) do
GetNextAddresses.deserialize(rest)
end

def decode(<<229::8, rest::bitstring>>) do
AddressList.deserialize(rest)
end

def decode(<<230::8, rest::bitstring>>) do
ShardRepair.deserialize(rest)
end
Expand Down Expand Up @@ -1873,6 +1893,23 @@ defmodule Archethic.P2P.Message do
%Ok{}
end

def process(%GetNextAddresses{address: address}, _) do
case TransactionChain.get_transaction(address, validation_stamp: [:timestamp]) do
{:ok, %Transaction{validation_stamp: %ValidationStamp{timestamp: address_timestamp}}} ->
addresses =
TransactionChain.get_genesis_address(address)
|> TransactionChain.list_chain_addresses()
|> Enum.filter(fn {_address, timestamp} ->
DateTime.compare(timestamp, address_timestamp) == :gt
end)

%AddressList{addresses: addresses}

_ ->
%AddressList{addresses: []}
end
end

defp process_replication_chain(tx, replying_node_public_key) do
Task.Supervisor.start_child(TaskSupervisor, fn ->
response =
Expand Down
63 changes: 63 additions & 0 deletions lib/archethic/p2p/message/address_list.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
defmodule Archethic.P2P.Message.AddressList do
@moduledoc """
Inform a shard to start repair.
"""
@enforce_keys [:addresses]
defstruct [:addresses]

alias Archethic.Crypto

alias Archethic.Utils
alias Archethic.Utils.VarInt

@type t :: %__MODULE__{addresses: list(Crypto.prepended_hash())}

@doc """
Serialize AddressList Struct
iex> %AddressList{
...> addresses: [<<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172,
...> 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>>,
...> <<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172,
...> 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>>]
...> } |> AddressList.serialize()
# VarInt
<<1, 2,
# Addresses
0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172,
3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251,
0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172,
3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>>
"""
def serialize(%__MODULE__{addresses: addresses}) do
<<VarInt.from_value(length(addresses))::binary, :erlang.list_to_binary(addresses)::binary>>
end

@doc """
Deserialize AddressList Struct
iex> # VarInt
...> <<1, 2,
...> # Addresses
...> 0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172,
...> 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251,
...> 0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172,
...> 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>>
...> |> AddressList.deserialize()
{
%AddressList{
addresses: [<<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172,
3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>>,
<<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172,
3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>>]
}, ""}
"""
def deserialize(bin) do
{addresses_length, rest} = VarInt.get_value(bin)

{addresses, rest} = Utils.deserialize_addresses(rest, addresses_length, [])

{%__MODULE__{addresses: addresses}, rest}
end
end
46 changes: 46 additions & 0 deletions lib/archethic/p2p/message/get_next_addresses.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
defmodule Archethic.P2P.Message.GetNextAddresses do
@moduledoc """
Inform a shard to start repair.
"""
@enforce_keys [:address]
defstruct [:address]

alias Archethic.Crypto

alias Archethic.Utils

@type t :: %__MODULE__{address: Crypto.prepended_hash()}

@doc """
Serialize GetNextAddresses Struct
iex> %GetNextAddresses{
...> address: <<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172,
...> 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>>
...> } |> GetNextAddresses.serialize()
<<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172,
3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>>
"""
def serialize(%__MODULE__{address: address}) do
<<address::binary>>
end

@doc """
Deserialize GetNextAddresses Struct
iex> <<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172,
...> 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>>
...> |> GetNextAddresses.deserialize()
{
%GetNextAddresses{
address: <<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172,
3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>>
}, ""}
"""
def deserialize(bin) do
{address, rest} = Utils.deserialize_address(bin)

{%__MODULE__{address: address}, rest}
end
end
48 changes: 45 additions & 3 deletions lib/archethic/self_repair/repair_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule Archethic.SelfRepair.RepairWorker do
@moduledoc false

alias Archethic.{
Contracts,
BeaconChain,
Election,
P2P,
Expand Down Expand Up @@ -32,6 +33,9 @@ defmodule Archethic.SelfRepair.RepairWorker do
address: Base.encode16(first_address)
)

# We get the authorized nodes before the last summary date as we are sure that they know
# the informations we need. Requesting current nodes may ask information to nodes in same repair
# process as we are here.
authorized_nodes =
DateTime.utc_now()
|> BeaconChain.previous_summary_time()
Expand Down Expand Up @@ -124,9 +128,14 @@ defmodule Archethic.SelfRepair.RepairWorker do
with false <- TransactionChain.transaction_exists?(address),
storage_nodes <- Election.chain_storage_nodes(address, authorized_nodes),
{:ok, tx} <- TransactionChain.fetch_transaction_remotely(address, storage_nodes) do
if storage?,
do: Replication.validate_and_store_transaction_chain(tx, true, authorized_nodes),
else: Replication.validate_and_store_transaction(tx, true)
if storage? do
case Replication.validate_and_store_transaction_chain(tx, true, authorized_nodes) do
:ok -> update_last_address(address, authorized_nodes)
error -> error
end
else
Replication.validate_and_store_transaction(tx, true)
end
else
true ->
Logger.debug("Notifier RepairWorker transaction already exists",
Expand All @@ -139,4 +148,37 @@ defmodule Archethic.SelfRepair.RepairWorker do
)
end
end

@doc """
Request missing transaction addresses from last local address until last chain address
and add them in the DB
"""
def update_last_address(address, authorized_nodes) do
# As the node is storage node of this chain, it needs to know all the addresses of the chain until the last
# So we get the local last address and verify if it's the same as the last address of the chain
# by requesting the nodes which already know the last address

{last_local_address, _timestamp} = TransactionChain.get_last_address(address)
storage_nodes = Election.storage_nodes(last_local_address, authorized_nodes)

case TransactionChain.fetch_next_chain_addresses_remotely(last_local_address, storage_nodes) do
{:ok, []} ->
:ok

{:ok, addresses} ->
genesis_address = TransactionChain.get_genesis_address(address)

addresses
|> Enum.sort_by(fn {_address, timestamp} -> timestamp end)
|> Enum.each(fn {address, timestamp} ->
TransactionChain.register_last_address(genesis_address, address, timestamp)
end)

# Stop potential previous smart contract
Contracts.stop_contract(address)

_ ->
:ok
end
end
end
Loading

0 comments on commit 497d951

Please sign in to comment.