Skip to content

Commit

Permalink
Fix self repair notifier (#645 )
Browse files Browse the repository at this point in the history
* Fix notifier workflow for storage transaction
* Add IO transactions
* Add before? flag for self repair to avoid using the newly authorized nodes
* Start Notifier after availability update to ensure nodes finished their self repair
* Ensure addresses are expected to be replicated
* Request missing chain addresses while new transaction stored
* Fix beacon live downloading aggregate from BD instead of quorum
* Store beacon aggregate when notifier is triggered

Co-authored-by: Neylix <julien.leclerc05@protonmail.com>
  • Loading branch information
apoorv-2204 and Neylix committed Dec 2, 2022
1 parent 2b52177 commit 091d8e8
Show file tree
Hide file tree
Showing 31 changed files with 1,612 additions and 422 deletions.
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ compile_c_programs:
mkdir -p priv/c_dist
$(CC) src/c/crypto/stdio_helpers.c src/c/crypto/ed25519.c -o priv/c_dist/libsodium_port -I src/c/crypto/stdio_helpers.h -lsodium
$(CC) src/c/hypergeometric_distribution.c -o priv/c_dist/hypergeometric_distribution -lgmp

git submodule update --force --recursive --init --remote
$(MAKE) -C src/c/nat/miniupnp/miniupnpc
cp src/c/nat/miniupnp/miniupnpc/build/upnpc-static priv/c_dist/upnpc


ifeq ($(TPM_INSTALLED),0)
$(CC) src/c/crypto/stdio_helpers.c src/c/crypto/tpm/lib.c src/c/crypto/tpm/port.c -o priv/c_dist/tpm_port -I src/c/crypto/stdio_helpers.h -I src/c/crypto/tpm/lib.h $(TPMFLAGS)
Expand All @@ -24,6 +24,7 @@ endif
clean:
rm -f priv/c_dist/*
mix archethic.clean_db
mix clean

docker-clean: clean
docker container stop $$(docker ps -a --filter=name=utn* -q)
Expand Down
1 change: 1 addition & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ config :git_hooks,
pre_push: [
tasks: [
{:cmd, "mix clean"},
{:cmd, "mix git_hooks.install"},
{:cmd, "mix hex.outdated --within-requirements"},
{:cmd, "mix format --check-formatted"},
{:cmd, "mix compile --warnings-as-errors"},
Expand Down
9 changes: 1 addition & 8 deletions lib/archethic/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -320,17 +320,10 @@ defmodule Archethic.BeaconChain do
end

defp get_summary_address_by_node(date, subset, authorized_nodes) do
# Remove the newly authorized nodes at this specific time
filter_nodes =
case authorized_nodes do
[node] -> [node]
nodes -> Enum.filter(nodes, &(DateTime.compare(&1.authorization_date, date) == :lt))
end

summary_address = Crypto.derive_beacon_chain_address(subset, date, true)

subset
|> Election.beacon_storage_nodes(date, filter_nodes)
|> Election.beacon_storage_nodes(date, authorized_nodes)
|> Enum.map(fn node ->
{node, summary_address}
end)
Expand Down
7 changes: 1 addition & 6 deletions lib/archethic/beacon_chain/subset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -323,12 +323,7 @@ defmodule Archethic.BeaconChain.Subset do
end

defp broadcast_beacon_slot(subset, next_time, slot) do
# Remove the newly authorized nodes at this specific time
nodes =
case P2P.authorized_and_available_nodes(next_time) do
[node] -> [node]
nodes -> Enum.filter(nodes, &(DateTime.compare(&1.authorization_date, next_time) == :lt))
end
nodes = P2P.authorized_and_available_nodes(next_time, true)

subset
|> Election.beacon_storage_nodes(next_time, nodes)
Expand Down
4 changes: 3 additions & 1 deletion lib/archethic/db.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ defmodule Archethic.DB do
@callback list_addresses_by_type(Transaction.transaction_type()) ::
Enumerable.t() | list(binary())
@callback list_chain_addresses(binary()) ::
Enumerable.t() | list({binary(), non_neg_integer()})
Enumerable.t() | list({binary(), DateTime.t()})

@callback get_last_chain_address(binary()) :: {binary(), DateTime.t()}
@callback get_last_chain_address(binary(), DateTime.t()) :: {binary(), DateTime.t()}
Expand Down Expand Up @@ -79,4 +79,6 @@ defmodule Archethic.DB do
:ok
@callback get_inputs(ledger :: :UCO | :token, address :: binary()) ::
list(VersionedTransactionInput.t())

@callback stream_first_addresses() :: Enumerable.t()
end
26 changes: 20 additions & 6 deletions lib/archethic/db/embedded_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ defmodule Archethic.DB.EmbeddedImpl do
Stream all the addresses from the Genesis address(following it).
"""
@spec list_chain_addresses(binary()) ::
Enumerable.t() | list({binary(), non_neg_integer()})
Enumerable.t() | list({binary(), DateTime.t()})
def list_chain_addresses(address) when is_binary(address) do
ChainIndex.list_chain_addresses(address, db_path())
end
Expand Down Expand Up @@ -249,16 +249,16 @@ defmodule Archethic.DB.EmbeddedImpl do
end

@doc """
Reference a last address from a previous address
Reference a last address from a genesis address
"""
@spec add_last_transaction_address(
previous_address :: binary(),
genesis_address :: binary(),
address :: binary(),
tx_time :: DateTime.t()
) :: :ok
def add_last_transaction_address(previous_address, address, date = %DateTime{})
when is_binary(previous_address) and is_binary(address) do
ChainIndex.set_last_chain_address(previous_address, address, date, db_path())
def add_last_transaction_address(genesis_address, address, date = %DateTime{})
when is_binary(genesis_address) and is_binary(address) do
ChainIndex.set_last_chain_address(genesis_address, address, date, db_path())
end

@doc """
Expand Down Expand Up @@ -384,4 +384,18 @@ defmodule Archethic.DB.EmbeddedImpl do
@spec get_inputs(ledger :: :UCO | :token, address :: binary()) ::
list(VersionedTransactionInput.t())
defdelegate get_inputs(ledger, address), to: InputsReader, as: :get_inputs

@doc """
Stream first transactions address of a chain from genesis_address.
"""
@spec stream_first_addresses :: Enumerable.t()
def stream_first_addresses do
ChainIndex.list_genesis_addresses()
|> Stream.map(fn gen_address ->
gen_address
|> list_chain_addresses()
|> Enum.at(0)
|> elem(0)
end)
end
end
51 changes: 28 additions & 23 deletions lib/archethic/db/embedded_impl/chain_index.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,22 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
alias Archethic.DB.EmbeddedImpl.ChainWriter
alias Archethic.TransactionChain.Transaction

@archethic_db_tx_index :archethic_db_tx_index
@archethic_db_chain_stats :archethic_db_chain_stats
@archethic_db_last_index :archethic_db_last_index
@archethic_db_type_stats :archethic_db_type_stats

def start_link(arg \\ []) do
GenServer.start_link(__MODULE__, arg, name: __MODULE__)
end

def init(opts) do
db_path = Keyword.fetch!(opts, :path)

:ets.new(:archethic_db_tx_index, [:set, :named_table, :public, read_concurrency: true])
:ets.new(:archethic_db_chain_stats, [:set, :named_table, :public, read_concurrency: true])
:ets.new(:archethic_db_last_index, [:set, :named_table, :public, read_concurrency: true])
:ets.new(:archethic_db_type_stats, [:set, :named_table, :public, read_concurrency: true])
:ets.new(@archethic_db_tx_index, [:set, :named_table, :public, read_concurrency: true])
:ets.new(@archethic_db_chain_stats, [:set, :named_table, :public, read_concurrency: true])
:ets.new(@archethic_db_last_index, [:set, :named_table, :public, read_concurrency: true])
:ets.new(@archethic_db_type_stats, [:set, :named_table, :public, read_concurrency: true])

fill_tables(db_path)

Expand Down Expand Up @@ -60,12 +65,12 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do

true =
:ets.insert(
:archethic_db_tx_index,
@archethic_db_tx_index,
{current_address, %{size: size, offset: offset, genesis_address: genesis_address}}
)

:ets.update_counter(
:archethic_db_chain_stats,
@archethic_db_chain_stats,
genesis_address,
[
{2, size},
Expand All @@ -89,7 +94,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do

true =
:ets.insert(
:archethic_db_last_index,
@archethic_db_last_index,
{genesis_address, last_address, DateTime.to_unix(timestamp, :millisecond)}
)
end)
Expand All @@ -100,10 +105,10 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
case File.open(type_path(db_path, type), [:read, :binary]) do
{:ok, fd} ->
nb_txs = do_scan_types(fd)
:ets.insert(:archethic_db_type_stats, {type, nb_txs})
:ets.insert(@archethic_db_type_stats, {type, nb_txs})

{:error, _} ->
:ets.insert(:archethic_db_type_stats, {type, 0})
:ets.insert(@archethic_db_type_stats, {type, 0})
end
end)
end
Expand Down Expand Up @@ -142,12 +147,12 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
# Write fast lookup entry for this transaction on memory
true =
:ets.insert(
:archethic_db_tx_index,
@archethic_db_tx_index,
{tx_address, %{size: size, offset: last_offset, genesis_address: genesis_address}}
)

:ets.update_counter(
:archethic_db_chain_stats,
@archethic_db_chain_stats,
genesis_address,
[
{2, size},
Expand All @@ -162,7 +167,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
@spec get_file_stats(binary()) ::
{offset :: non_neg_integer(), nb_transactions :: non_neg_integer()}
def get_file_stats(genesis_address) do
case :ets.lookup(:archethic_db_chain_stats, genesis_address) do
case :ets.lookup(@archethic_db_chain_stats, genesis_address) do
[{_, last_offset, nb_txs}] ->
{last_offset, nb_txs}

Expand Down Expand Up @@ -207,7 +212,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
"""
@spec get_tx_entry(binary(), String.t()) :: {:ok, map()} | {:error, :not_exists}
def get_tx_entry(address, db_path) do
case :ets.lookup(:archethic_db_tx_index, address) do
case :ets.lookup(@archethic_db_tx_index, address) do
[] ->
# If the transaction is not found in the in memory lookup
# we scan the index file for the subset of the transaction to find the relative information
Expand Down Expand Up @@ -299,7 +304,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
Stream all the transaction addresses from genesis_address-address.
"""
@spec list_chain_addresses(binary(), String.t()) ::
Enumerable.t() | list({binary(), non_neg_integer()})
Enumerable.t() | list({binary(), DateTime.t()})
def list_chain_addresses(address, db_path) when is_binary(address) do
filepath = chain_addresses_path(db_path, address)

Expand All @@ -316,7 +321,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
{:ok, hash} <- :file.read(fd, hash_size) do
address = <<curve_id::8, hash_id::8, hash::binary>>
# return tuple of address and timestamp
{[{address, timestamp}], {:ok, fd}}
{[{address, DateTime.from_unix!(timestamp, :millisecond)}], {:ok, fd}}
else
:eof ->
:file.close(fd)
Expand All @@ -335,7 +340,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
"""
@spec count_transactions_by_type(Transaction.transaction_type()) :: non_neg_integer()
def count_transactions_by_type(type) do
case :ets.lookup(:archethic_db_type_stats, type) do
case :ets.lookup(@archethic_db_type_stats, type) do
[] ->
0

Expand All @@ -351,7 +356,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
:ok
def add_tx_type(type, address, db_path) do
File.write!(type_path(db_path, type), address, [:append, :binary])
:ets.update_counter(:archethic_db_type_stats, type, {2, 1}, {type, 0})
:ets.update_counter(@archethic_db_type_stats, type, {2, 1}, {type, 0})
:ok
end

Expand All @@ -372,15 +377,15 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
filename = chain_addresses_path(db_path, genesis_address)

write_last_chain_transaction? =
case :ets.lookup(:archethic_db_last_index, genesis_address) do
case :ets.lookup(@archethic_db_last_index, genesis_address) do
[{_, ^new_address, _}] -> false
[{_, _, chain_unix_time}] when unix_time < chain_unix_time -> false
_ -> true
end

if write_last_chain_transaction? do
:ok = File.write!(filename, encoded_data, [:binary, :append])
true = :ets.insert(:archethic_db_last_index, {genesis_address, new_address, unix_time})
true = :ets.insert(@archethic_db_last_index, {genesis_address, new_address, unix_time})
end

:ok
Expand All @@ -396,7 +401,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
case get_tx_entry(address, db_path) do
{:ok, %{genesis_address: genesis_address}} ->
# Search in the latest in memory index
case :ets.lookup(:archethic_db_last_index, genesis_address) do
case :ets.lookup(@archethic_db_last_index, genesis_address) do
[] ->
# If not present, the we search in the index file
unix_time = DateTime.utc_now() |> DateTime.to_unix(:millisecond)
Expand All @@ -410,7 +415,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do

{:error, :not_exists} ->
# We try if the request address is the genesis address to fetch the in memory index
case :ets.lookup(:archethic_db_last_index, address) do
case :ets.lookup(@archethic_db_last_index, address) do
[] ->
# If not present, the we search in the index file
unix_time = DateTime.utc_now() |> DateTime.to_unix(:millisecond)
Expand Down Expand Up @@ -610,14 +615,14 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
end

defp stream_genesis_addresses(acc = []) do
case :ets.first(:archethic_db_chain_stats) do
case :ets.first(@archethic_db_chain_stats) do
:"$end_of_table" -> {:halt, acc}
first_key -> {[first_key], first_key}
end
end

defp stream_genesis_addresses(acc) do
case :ets.next(:archethic_db_chain_stats, acc) do
case :ets.next(@archethic_db_chain_stats, acc) do
:"$end_of_table" -> {:halt, acc}
next_key -> {[next_key], next_key}
end
Expand Down
Loading

0 comments on commit 091d8e8

Please sign in to comment.