Skip to content

Commit

Permalink
Resolve last address conflicts (#527)
Browse files Browse the repository at this point in the history
* Return last transaction address time from DB

* Add timestamp in the LastTransactionAddress message

* Handle conflict on last transaction address
  • Loading branch information
Samuel committed Aug 19, 2022
1 parent b14e7c5 commit 20f7548
Show file tree
Hide file tree
Showing 20 changed files with 220 additions and 132 deletions.
4 changes: 2 additions & 2 deletions lib/archethic/db.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ defmodule Archethic.DB do

@callback list_addresses_by_type(Transaction.transaction_type()) ::
Enumerable.t() | list(binary())
@callback get_last_chain_address(binary()) :: binary()
@callback get_last_chain_address(binary(), DateTime.t()) :: binary()
@callback get_last_chain_address(binary()) :: {binary(), DateTime.t()}
@callback get_last_chain_address(binary(), DateTime.t()) :: {binary(), DateTime.t()}
@callback get_first_chain_address(binary()) :: binary()
@callback get_first_public_key(Crypto.key()) :: binary()

Expand Down
6 changes: 4 additions & 2 deletions lib/archethic/db/embedded_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,10 @@ defmodule Archethic.DB.EmbeddedImpl do
end

@doc """
Return the last address from the given transaction's address until the given date
Return the last address from the given transaction's address until the given date along with its timestamp
"""
@spec get_last_chain_address(address :: binary(), until :: DateTime.t()) :: binary()
@spec get_last_chain_address(address :: binary(), until :: DateTime.t()) ::
{address :: binary(), last_address_timestamp :: DateTime.t()}
def get_last_chain_address(address, date = %DateTime{} \\ DateTime.utc_now())
when is_binary(address) do
ChainIndex.get_last_chain_address(address, date, db_path())
Expand Down Expand Up @@ -248,6 +249,7 @@ defmodule Archethic.DB.EmbeddedImpl do
def list_last_transaction_addresses do
ChainIndex.list_genesis_addresses()
|> Stream.map(&get_last_chain_address/1)
|> Stream.map(fn {address, _time} -> address end)
end

@doc """
Expand Down
76 changes: 46 additions & 30 deletions lib/archethic/db/embedded_impl/chain_index.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
scan_summary_table(subset_summary_filename)
end)

fill_last_addresses(db_path)
fill_type_stats(db_path)
end

Expand All @@ -56,9 +57,6 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
current_address = <<current_curve_id::8, current_hash_type::8, current_digest::binary>>
genesis_address = <<genesis_curve_id::8, genesis_hash_type::8, genesis_digest::binary>>

# Register last addresses of genesis address
true = :ets.insert(:archethic_db_last_index, {genesis_address, current_address})

true =
:ets.insert(
:archethic_db_tx_index,
Expand All @@ -83,6 +81,19 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
end
end

defp fill_last_addresses(db_path) do
Enum.each(list_genesis_addresses(), fn genesis_address ->
# Register last addresses of genesis address
{last_address, timestamp} = get_last_chain_address(genesis_address, db_path)

true =
:ets.insert(
:archethic_db_last_index,
{genesis_address, last_address, DateTime.to_unix(timestamp, :millisecond)}
)
end)
end

defp fill_type_stats(db_path) do
Enum.each(Transaction.types(), fn type ->
case File.open(type_path(db_path, type), [:read, :binary]) do
Expand Down Expand Up @@ -325,14 +336,15 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
filename = chain_addresses_path(db_path, genesis_address)

:ok = File.write!(filename, encoded_data, [:binary, :append])
true = :ets.insert(:archethic_db_last_index, {genesis_address, new_address})
true = :ets.insert(:archethic_db_last_index, {genesis_address, new_address, unix_time})
:ok
end

@doc """
Return the last address of the chain
"""
@spec get_last_chain_address(address :: binary(), db_path :: String.t()) :: binary()
@spec get_last_chain_address(address :: binary(), db_path :: String.t()) ::
{binary(), DateTime.t()}
def get_last_chain_address(address, db_path) do
# We try with a transaction on a chain, to identity the genesis address
case get_tx_entry(address, db_path) do
Expand All @@ -343,20 +355,25 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
# If not present, the we search in the index file
unix_time = DateTime.utc_now() |> DateTime.to_unix(:millisecond)

search_last_address_until(genesis_address, unix_time, db_path) || address
search_last_address_until(genesis_address, unix_time, db_path) ||
{address, DateTime.utc_now()}

[{_, last_address}] ->
last_address
[{_, last_address, last_time}] ->
{last_address, DateTime.from_unix!(last_time, :millisecond)}
end

{:error, :not_exists} ->
# We try if the request address is the genesis address to fetch the in memory index
case :ets.lookup(:archethic_db_last_index, address) do
[] ->
address
# If not present, the we search in the index file
unix_time = DateTime.utc_now() |> DateTime.to_unix(:millisecond)

[{_, last_address}] ->
last_address
search_last_address_until(address, unix_time, db_path) ||
{address, DateTime.utc_now()}

[{_, last_address, last_time}] ->
{last_address, DateTime.from_unix!(last_time, :millisecond)}
end
end
end
Expand All @@ -365,28 +382,31 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
Return the last address of the chain before or equal to the given date
"""
@spec get_last_chain_address(address :: binary(), until :: DateTime.t(), db_path :: String.t()) ::
binary()
{last_addresss :: binary(), last_time :: DateTime.t()}
def get_last_chain_address(address, datetime = %DateTime{}, db_path) do
unix_time = DateTime.to_unix(datetime, :millisecond)

# We get the genesis address of this given transaction address
case get_tx_entry(address, db_path) do
{:ok, %{genesis_address: genesis_address}} ->
search_last_address_until(genesis_address, unix_time, db_path) || address
search_last_address_until(genesis_address, unix_time, db_path) || {address, datetime}

{:error, :not_exists} ->
# We try to search with given address as genesis address
# Then `address` acts the genesis address
search_last_address_until(address, unix_time, db_path) || address
search_last_address_until(address, unix_time, db_path) || {address, datetime}
end
end

defp search_last_address_until(genesis_address, until, db_path) do
filepath = chain_addresses_path(db_path, genesis_address)

case File.open(filepath, [:binary, :read]) do
{:ok, fd} ->
do_search_last_address_until(fd, until)
with {:ok, fd} <- File.open(filepath, [:binary, :read]),
{address, timestamp} <- do_search_last_address_until(fd, until) do
{address, DateTime.from_unix!(timestamp, :millisecond)}
else
nil ->
nil

{:error, _} ->
nil
Expand All @@ -400,21 +420,17 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
{:ok, hash} <- :file.read(fd, hash_size) do
address = <<curve_id::8, hash_id::8, hash::binary>>

if timestamp < until do
do_search_last_address_until(fd, until, address)
else
cond do
timestamp == until ->
:file.close(fd)
address
cond do
timestamp < until ->
do_search_last_address_until(fd, until, {address, timestamp})

timestamp < until ->
do_search_last_address_until(fd, until, address)
timestamp == until ->
:file.close(fd)
{address, timestamp}

true ->
:file.close(fd)
acc
end
true ->
:file.close(fd)
acc
end
else
:eof ->
Expand Down
14 changes: 7 additions & 7 deletions lib/archethic/mining/pending_transaction_validation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -321,19 +321,19 @@ defmodule Archethic.Mining.PendingTransactionValidation do
type: :mint_rewards,
data: %TransactionData{content: content}
}) do
total_fee = DB.get_latest_burned_fees()

with :ok <- verify_token_creation(content),
{:ok, %{"supply" => supply}} <- Jason.decode(content),
true <- supply == DB.get_latest_burned_fees(),
{:ok, %{"supply" => ^total_fee}} <- Jason.decode(content),
network_pool_address <- SharedSecrets.get_network_pool_address(),
false <-
DB.get_last_chain_address(network_pool_address, Reward.last_scheduling_date()) !=
network_pool_address do
{^network_pool_address, _} <-
DB.get_last_chain_address(network_pool_address, Reward.last_scheduling_date()) do
:ok
else
false ->
{:ok, %{"supply" => _}} ->
{:error, "The supply do not match burned fees from last summary"}

true ->
{_, _} ->
{:error, "There is already a mint rewards transaction since last schedule"}

e ->
Expand Down
7 changes: 3 additions & 4 deletions lib/archethic/oracle_chain/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -573,10 +573,9 @@ defmodule Archethic.OracleChain.Scheduler do
end

defp chain_size(summary_date = %DateTime{}) do
summary_date
|> Crypto.derive_oracle_address(0)
|> TransactionChain.get_last_address()
|> TransactionChain.size()
oracle_genesis_address = Crypto.derive_oracle_address(summary_date, 0)
{last_address, _} = TransactionChain.get_last_address(oracle_genesis_address)
TransactionChain.size(last_address)
end

defp get_oracle_data(address) do
Expand Down
16 changes: 10 additions & 6 deletions lib/archethic/p2p/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -459,8 +459,8 @@ defmodule Archethic.P2P.Message do
<<240::8, Summary.serialize(summary)::bitstring>>
end

def encode(%LastTransactionAddress{address: address}) do
<<241::8, address::binary>>
def encode(%LastTransactionAddress{address: address, timestamp: timestamp}) do
<<241::8, address::binary, DateTime.to_unix(timestamp, :millisecond)::64>>
end

def encode(%FirstPublicKey{public_key: public_key}) do
Expand Down Expand Up @@ -971,8 +971,12 @@ defmodule Archethic.P2P.Message do
end

def decode(<<241::8, rest::bitstring>>) do
{address, rest} = Utils.deserialize_address(rest)
{%LastTransactionAddress{address: address}, rest}
{address, <<timestamp::64, rest::bitstring>>} = Utils.deserialize_address(rest)

{%LastTransactionAddress{
address: address,
timestamp: DateTime.from_unix!(timestamp, :millisecond)
}, rest}
end

def decode(<<242::8, rest::bitstring>>) do
Expand Down Expand Up @@ -1433,8 +1437,8 @@ defmodule Archethic.P2P.Message do
end

def process(%GetLastTransactionAddress{address: address, timestamp: timestamp}) do
address = TransactionChain.get_last_address(address, timestamp)
%LastTransactionAddress{address: address}
{address, time} = TransactionChain.get_last_address(address, timestamp)
%LastTransactionAddress{address: address, timestamp: time}
end

def process(%NotifyLastTransactionAddress{
Expand Down
5 changes: 3 additions & 2 deletions lib/archethic/p2p/message/last_transaction_address.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ defmodule Archethic.P2P.Message.LastTransactionAddress do
Represents a message with the last address key from a transaction chain
"""
@enforce_keys [:address]
defstruct [:address]
defstruct [:address, :timestamp]

alias Archethic.Crypto

@type t :: %__MODULE__{
address: Crypto.versioned_hash()
address: Crypto.versioned_hash(),
timestamp: DateTime.t()
}
end
25 changes: 14 additions & 11 deletions lib/archethic/transaction_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,17 @@ defmodule Archethic.TransactionChain do
defdelegate list_addresses_by_type(type), to: DB

@doc """
Get the last transaction address from a transaction chain
Get the last transaction address from a transaction chain with the latest time
"""
@spec get_last_address(binary()) :: binary()
@spec get_last_address(binary()) :: {binary(), DateTime.t()}
defdelegate get_last_address(address),
to: DB,
as: :get_last_chain_address

@doc """
Get the last transaction address from a transaction chain before a given date
Get the last transaction address from a transaction chain before a given date along its last time
"""
@spec get_last_address(binary(), DateTime.t()) :: binary()
@spec get_last_address(binary(), DateTime.t()) :: {binary(), DateTime.t()}
defdelegate get_last_address(address, timestamp),
to: DB,
as: :get_last_chain_address
Expand Down Expand Up @@ -238,9 +238,8 @@ defmodule Archethic.TransactionChain do
| {:error, :transaction_not_exists}
| {:error, :invalid_transaction}
def get_last_transaction(address, fields \\ []) when is_binary(address) and is_list(fields) do
address
|> get_last_address()
|> get_transaction(fields)
{address, _} = get_last_address(address)
get_transaction(address, fields)
end

@doc """
Expand Down Expand Up @@ -563,10 +562,14 @@ defmodule Archethic.TransactionChain do
{:ok, binary()} | {:error, :network_issue}
def fetch_last_address_remotely(address, nodes, timestamp = %DateTime{} \\ DateTime.utc_now())
when is_binary(address) and is_list(nodes) do
# TODO: implement conflict resolver to get the latest address
conflict_resolver = fn results ->
Enum.max_by(results, &DateTime.to_unix(&1.timestamp, :millisecond))
end

case P2P.quorum_read(
nodes,
%GetLastTransactionAddress{address: address, timestamp: timestamp}
%GetLastTransactionAddress{address: address, timestamp: timestamp},
conflict_resolver
) do
{:ok, %LastTransactionAddress{address: last_address}} ->
{:ok, last_address}
Expand Down Expand Up @@ -844,8 +847,8 @@ defmodule Archethic.TransactionChain do
case fetch_genesis_address_remotely(address) do
{:ok, genesis_address} ->
case get_last_address(genesis_address) do
^genesis_address -> nil
last_address -> last_address
{^genesis_address, _} -> nil
{last_address, _} -> last_address
end

_ ->
Expand Down
7 changes: 4 additions & 3 deletions lib/archethic_web/live/chains/oracle_live.ex
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,10 @@ defmodule ArchethicWeb.OracleChainLive do
end

defp list_transactions_by_date(date = %DateTime{}) do
date
|> Crypto.derive_oracle_address(0)
|> TransactionChain.get_last_address()
oracle_genesis_address = Crypto.derive_oracle_address(date, 0)
{last_address, _} = TransactionChain.get_last_address(oracle_genesis_address)

last_address
|> get_transaction_chain()
|> Stream.map(fn %Transaction{
address: address,
Expand Down
8 changes: 4 additions & 4 deletions test/archethic/bootstrap/network_init_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ defmodule Archethic.Bootstrap.NetworkInitTest do

MockClient
|> stub(:send_message, fn _, %GetLastTransactionAddress{address: address}, _ ->
{:ok, %LastTransactionAddress{address: address}}
{:ok, %LastTransactionAddress{address: address, timestamp: DateTime.utc_now()}}
end)

MockDB
Expand Down Expand Up @@ -306,7 +306,7 @@ defmodule Archethic.Bootstrap.NetworkInitTest do
{:ok, %TransactionInputList{inputs: []}}

_, %GetLastTransactionAddress{address: address}, _ ->
{:ok, %LastTransactionAddress{address: address}}
{:ok, %LastTransactionAddress{address: address, timestamp: DateTime.utc_now()}}

_, %GetTransactionChainLength{}, _ ->
%TransactionChainLength{length: 1}
Expand Down Expand Up @@ -354,7 +354,7 @@ defmodule Archethic.Bootstrap.NetworkInitTest do
{:ok, %TransactionInputList{inputs: []}}

_, %GetLastTransactionAddress{address: address}, _ ->
{:ok, %LastTransactionAddress{address: address}}
{:ok, %LastTransactionAddress{address: address, timestamp: DateTime.utc_now()}}

_, %GetTransactionChainLength{}, _ ->
%TransactionChainLength{length: 1}
Expand Down Expand Up @@ -401,7 +401,7 @@ defmodule Archethic.Bootstrap.NetworkInitTest do
{:ok, %TransactionInputList{inputs: []}}

_, %GetLastTransactionAddress{address: address}, _ ->
{:ok, %LastTransactionAddress{address: address}}
{:ok, %LastTransactionAddress{address: address, timestamp: DateTime.utc_now()}}

_, %GetTransactionChainLength{}, _ ->
%TransactionChainLength{length: 1}
Expand Down
Loading

0 comments on commit 20f7548

Please sign in to comment.