Skip to content

Commit

Permalink
Unify quorum read with local availability and nearest nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelmanzanera committed Nov 17, 2022
1 parent 94db819 commit 380cbea
Show file tree
Hide file tree
Showing 17 changed files with 179 additions and 193 deletions.
50 changes: 8 additions & 42 deletions lib/archethic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule Archethic do
alias __MODULE__.Mining

alias __MODULE__.P2P
alias __MODULE__.P2P.Node

alias __MODULE__.DB

Expand All @@ -19,7 +20,6 @@ defmodule Archethic do
alias __MODULE__.P2P.Message.NewTransaction
alias __MODULE__.P2P.Message.Ok
alias __MODULE__.P2P.Message.StartMining
alias __MODULE__.P2P.Node

alias __MODULE__.TransactionChain
alias __MODULE__.TransactionChain.Transaction
Expand All @@ -37,13 +37,7 @@ defmodule Archethic do
| {:error, :network_issue}
def search_transaction(address) when is_binary(address) do
storage_nodes = Election.chain_storage_nodes(address, P2P.authorized_and_available_nodes())

nodes =
storage_nodes
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)

TransactionChain.fetch_transaction_remotely(address, nodes)
TransactionChain.fetch_transaction_remotely(address, storage_nodes)
end

@doc """
Expand Down Expand Up @@ -116,12 +110,7 @@ defmodule Archethic do
def get_last_transaction(address) when is_binary(address) do
case get_last_transaction_address(address) do
{:ok, last_address} ->
nodes =
last_address
|> Election.chain_storage_nodes(P2P.authorized_and_available_nodes())
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)

nodes = Election.chain_storage_nodes(last_address, P2P.authorized_and_available_nodes())
TransactionChain.fetch_transaction_remotely(last_address, nodes)

{:error, :network_issue} = e ->
Expand All @@ -146,8 +135,6 @@ defmodule Archethic do
def get_balance(address) when is_binary(address) do
address
|> Election.chain_storage_nodes(P2P.authorized_and_available_nodes())
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)
|> get_balance(address)
end

Expand Down Expand Up @@ -189,11 +176,7 @@ defmodule Archethic do
end

defp do_get_transaction_inputs(address) do
nodes =
address
|> Election.chain_storage_nodes(P2P.authorized_and_available_nodes())
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)
nodes = Election.chain_storage_nodes(address, P2P.authorized_and_available_nodes())

address
|> TransactionChain.stream_inputs_remotely(nodes, DateTime.utc_now())
Expand All @@ -211,11 +194,7 @@ defmodule Archethic do
def get_transaction_inputs(address, page, limit)
when is_binary(address) and is_integer(page) and page >= 0 and is_integer(limit) and
limit >= 0 do
nodes =
address
|> Election.chain_storage_nodes(P2P.authorized_and_available_nodes())
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)
nodes = Election.chain_storage_nodes(address, P2P.authorized_and_available_nodes())

{inputs, _more?, _offset} =
TransactionChain.fetch_inputs_remotely(address, nodes, DateTime.utc_now(), page, limit)
Expand All @@ -228,11 +207,7 @@ defmodule Archethic do
"""
@spec get_transaction_chain(binary()) :: {:ok, list(Transaction.t())} | {:error, :network_issue}
def get_transaction_chain(address) when is_binary(address) do
nodes =
address
|> Election.chain_storage_nodes(P2P.authorized_and_available_nodes())
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)
nodes = Election.chain_storage_nodes(address, P2P.authorized_and_available_nodes())

# We directly check if the transaction exists and retrieve the genesis
# Otherwise we are requesting the genesis address remotly
Expand Down Expand Up @@ -290,11 +265,7 @@ defmodule Archethic do
@spec get_transaction_chain_by_paging_address(binary(), binary()) ::
{:ok, list(Transaction.t())} | {:error, :network_issue}
def get_transaction_chain_by_paging_address(address, paging_address) when is_binary(address) do
nodes =
address
|> Election.chain_storage_nodes(P2P.authorized_and_available_nodes())
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)
nodes = Election.chain_storage_nodes(address, P2P.authorized_and_available_nodes())

try do
{local_chain, paging_address} =
Expand Down Expand Up @@ -331,12 +302,7 @@ defmodule Archethic do
@spec get_transaction_chain_length(binary()) ::
{:ok, non_neg_integer()} | {:error, :network_issue}
def get_transaction_chain_length(address) when is_binary(address) do
nodes =
address
|> Election.chain_storage_nodes(P2P.authorized_and_available_nodes())
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)

nodes = Election.chain_storage_nodes(address, P2P.authorized_and_available_nodes())
TransactionChain.fetch_size_remotely(address, nodes)
end
end
10 changes: 4 additions & 6 deletions lib/archethic/bootstrap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ defmodule Archethic.Bootstrap do
reward_address
)

post_bootstrap(patch: network_patch, sync?: true)
post_bootstrap(sync?: true)
else
if Sync.require_update?(ip, port, http_port, transport, last_sync_date) do
Logger.info("Update node chain...")
Expand All @@ -193,20 +193,18 @@ defmodule Archethic.Bootstrap do
reward_address
)

post_bootstrap(patch: network_patch, sync?: true)
post_bootstrap(sync?: true)
else
post_bootstrap(patch: network_patch, sync?: false)
post_bootstrap(sync?: false)
end
end
end
end

defp post_bootstrap(opts) do
if Keyword.get(opts, :sync?, true) do
patch = Keyword.fetch!(opts, :patch)

Logger.info("Synchronization started")
:ok = SelfRepair.bootstrap_sync(SelfRepair.last_sync_date(), patch)
:ok = SelfRepair.bootstrap_sync(SelfRepair.last_sync_date())
Logger.info("Synchronization finished")
end

Expand Down
8 changes: 1 addition & 7 deletions lib/archethic/mining/transaction_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,8 @@ defmodule Archethic.Mining.TransactionContext do
end

defp previous_nodes_distribution(previous_address, nb_sub_lists, sample_size) do
node_list =
P2P.unprioritize_node(P2P.authorized_and_available_nodes(), Crypto.first_node_public_key())

elected_nodes =
previous_address
|> Election.chain_storage_nodes(node_list)
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)
Election.chain_storage_nodes(previous_address, P2P.authorized_and_available_nodes())

if Enum.count(elected_nodes) < sample_size do
Enum.map(1..nb_sub_lists, fn _ -> elected_nodes end)
Expand Down
6 changes: 5 additions & 1 deletion lib/archethic/p2p.ex
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,11 @@ defmodule Archethic.P2P do
)

def quorum_read(nodes, message, conflict_resolver, timeout, consistency_level) do
do_quorum_read(nodes, message, conflict_resolver, timeout, consistency_level, nil)
nodes
|> Enum.filter(&Node.locally_available?/1)
|> nearest_nodes()
|> unprioritize_node(Crypto.first_node_public_key())
|> do_quorum_read(message, conflict_resolver, timeout, consistency_level, nil)
end

defp do_quorum_read([], _, _, _, _, nil), do: {:error, :network_issue}
Expand Down
43 changes: 13 additions & 30 deletions lib/archethic/replication/transaction_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ defmodule Archethic.Replication.TransactionContext do

alias Archethic.Election

alias Archethic.P2P
alias Archethic.P2P.Node

alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.TransactionInput
Expand All @@ -19,10 +16,10 @@ defmodule Archethic.Replication.TransactionContext do
"""
@spec fetch_transaction(address :: Crypto.versioned_hash(), list(Node.t())) ::
Transaction.t() | nil
def fetch_transaction(address, download_nodes) when is_binary(address) do
nodes = replication_nodes(address, download_nodes)
def fetch_transaction(address, node_list) when is_binary(address) do
storage_nodes = Election.chain_storage_nodes(address, node_list)

case TransactionChain.fetch_transaction_remotely(address, nodes) do
case TransactionChain.fetch_transaction_remotely(address, storage_nodes) do
{:ok, tx} ->
tx

Expand All @@ -36,16 +33,17 @@ defmodule Archethic.Replication.TransactionContext do
"""
@spec stream_transaction_chain(address :: Crypto.versioned_hash(), list(Node.t())) ::
Enumerable.t() | list(Transaction.t())
def stream_transaction_chain(address, download_nodes) when is_binary(address) do
case replication_nodes(address, download_nodes) do
def stream_transaction_chain(address, node_list) when is_binary(address) do
storage_nodes = Election.chain_storage_nodes(address, node_list)
paging_address = TransactionChain.get_last_local_address(address)

case storage_nodes do
[] ->
[]

nodes ->
paging_address = TransactionChain.get_last_local_address(address)

_ ->
if paging_address != address do
TransactionChain.stream_remotely(address, nodes, paging_address)
TransactionChain.stream_remotely(address, storage_nodes, paging_address)
else
[]
end
Expand All @@ -57,27 +55,12 @@ defmodule Archethic.Replication.TransactionContext do
"""
@spec fetch_transaction_inputs(address :: Crypto.versioned_hash(), DateTime.t(), list(Node.t())) ::
list(TransactionInput.t())
def fetch_transaction_inputs(address, timestamp = %DateTime{}, download_nodes)
def fetch_transaction_inputs(address, timestamp = %DateTime{}, node_list)
when is_binary(address) do
nodes = replication_nodes(address, download_nodes)
storage_nodes = Election.chain_storage_nodes(address, node_list)

address
|> TransactionChain.stream_inputs_remotely(nodes, timestamp)
|> TransactionChain.stream_inputs_remotely(storage_nodes, timestamp)
|> Enum.to_list()
end

defp replication_nodes(address, download_nodes) do
address
# returns the storage nodes for the transaction chain based on the transaction address
# from a list of available node
|> Election.chain_storage_nodes(download_nodes)
# Returns the nearest storages nodes from the local node as per the patch
# when the input is a list of nodes
|> P2P.nearest_nodes()
# Determine if the node is locally available based on its availability history.
# If the last exchange with node was succeed the node is considered as available
|> Enum.filter(&Node.locally_available?/1)
# Reorder a list of nodes to ensure the current node is only called at the end
|> P2P.unprioritize_node(Crypto.first_node_public_key())
end
end
8 changes: 4 additions & 4 deletions lib/archethic/self_repair.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ defmodule Archethic.SelfRepair do
@doc """
Start the bootstrap's synchronization process using the last synchronization date
"""
@spec bootstrap_sync(last_sync_date :: DateTime.t(), network_patch :: binary()) :: :ok
def bootstrap_sync(date = %DateTime{}, patch) when is_binary(patch) do
@spec bootstrap_sync(last_sync_date :: DateTime.t()) :: :ok
def bootstrap_sync(date = %DateTime{}) do
# Loading transactions can take a lot of time to be achieve and can overpass an epoch.
# So to avoid missing a beacon summary epoch, we save the starting date and update the last sync date with it
# at the end of loading (in case there is a crash during self repair).
Expand All @@ -36,7 +36,7 @@ defmodule Archethic.SelfRepair do
# as no data have been aggregated
if DateTime.diff(DateTime.utc_now(), summary_time) >= 0 do
start_date = DateTime.utc_now()
:ok = Sync.load_missed_transactions(date, patch)
:ok = Sync.load_missed_transactions(date)
put_last_sync_date(start_date)

# At the end of self repair, if a new beacon summary as been created
Expand All @@ -45,7 +45,7 @@ defmodule Archethic.SelfRepair do
|> BeaconChain.previous_summary_time()
|> DateTime.compare(start_date) do
:gt ->
bootstrap_sync(start_date, patch)
bootstrap_sync(start_date)

_ ->
:ok
Expand Down
8 changes: 1 addition & 7 deletions lib/archethic/self_repair/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ defmodule Archethic.SelfRepair.Scheduler do
@vsn Mix.Project.config()[:version]

alias Archethic.P2P
alias Archethic.P2P.Node

alias Archethic.SelfRepair.Sync

Expand Down Expand Up @@ -94,7 +93,7 @@ defmodule Archethic.SelfRepair.Scheduler do
# So to avoid missing a beacon summary epoch, we save the starting date and update the last sync date with it
# at the end of loading (in case there is a crash during self repair)
start_date = DateTime.utc_now()
:ok = Sync.load_missed_transactions(last_sync_date, get_node_patch())
:ok = Sync.load_missed_transactions(last_sync_date)
{:ok, start_date}
end)

Expand Down Expand Up @@ -136,11 +135,6 @@ defmodule Archethic.SelfRepair.Scheduler do
end
end

defp get_node_patch do
%Node{network_patch: network_patch} = P2P.get_node_info()
network_patch
end

defp update_last_sync_date(date = %DateTime{}) do
next_sync_date = Utils.truncate_datetime(date)
:ok = Sync.store_last_sync_date(next_sync_date)
Expand Down
Loading

0 comments on commit 380cbea

Please sign in to comment.