Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix request timeout #766

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 26 additions & 24 deletions lib/archethic/bootstrap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ defmodule Archethic.Bootstrap do
&(&1.first_public_key == Crypto.first_node_public_key())
)

do_resync_network_chain([:oracle, :node_shared_secrets], nodes)
[:oracle, :node_shared_secrets, :reward]
|> Enum.each(&do_resync_network_chain(&1, nodes))
end
end

Expand All @@ -240,34 +241,31 @@ defmodule Archethic.Bootstrap do

# by type: Get gen addr, get last address (remotely & locally)
# compare, if dont match, fetch last tx remotely
def do_resync_network_chain(type_list, nodes) when is_list(nodes) do
Task.Supervisor.async_stream_nolink(Archethic.TaskSupervisor, type_list, fn type ->
with addr when is_binary(addr) <- get_genesis_addr(type),
{:ok, rem_last_addr} <- TransactionChain.resolve_last_address(addr),
{local_last_addr, _} <- TransactionChain.get_last_address(addr),
false <- rem_last_addr == local_last_addr,
{:ok, tx} <- TransactionChain.fetch_transaction_remotely(rem_last_addr, nodes),
:ok <- Replication.validate_and_store_transaction_chain(tx) do
Logger.info("Enforced Resync: Success", transaction_type: type)
def do_resync_network_chain(type, nodes) when is_list(nodes) do
with addr when is_binary(addr) <- get_genesis_addr(type),
{:ok, rem_last_addr} <- TransactionChain.resolve_last_address(addr),
{local_last_addr, _} <- TransactionChain.get_last_address(addr),
false <- rem_last_addr == local_last_addr,
{:ok, tx} <- TransactionChain.fetch_transaction_remotely(rem_last_addr, nodes),
:ok <- Replication.validate_and_store_transaction_chain(tx) do
Logger.info("Enforced Resync: Success", transaction_type: type)
:ok
else
true ->
Logger.info("Enforced Resync: No new transaction to sync", transaction_type: type)
:ok
else
true ->
Logger.info("Enforced Resync: No new transaction to sync", transaction_type: type)
:ok

e when e in [nil, []] ->
Logger.debug("Enforced Resync: Transaction not available", transaction_type: type)
:ok
e when e in [nil, []] ->
Logger.debug("Enforced Resync: Transaction not available", transaction_type: type)
:ok

e ->
Logger.debug("Enforced Resync: Unexpected Error", transaction_type: type)
Logger.debug(e)
end
end)
|> Stream.run()
e ->
Logger.debug("Enforced Resync: Unexpected Error", transaction_type: type)
Logger.debug(e)
end
end

@spec get_genesis_addr(:node_shared_secrets | :oracle) :: binary() | nil
@spec get_genesis_addr(:node_shared_secrets | :oracle | :reward) :: binary() | nil
defp get_genesis_addr(:oracle) do
Archethic.OracleChain.genesis_address().current |> elem(0)
end
Expand All @@ -276,6 +274,10 @@ defmodule Archethic.Bootstrap do
Archethic.SharedSecrets.genesis_address(:node_shared_secrets)
end

defp get_genesis_addr(:reward) do
Archethic.Reward.genesis_address()
end

defp first_initialization(
ip,
port,
Expand Down
10 changes: 5 additions & 5 deletions lib/archethic/mining/transaction_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ defmodule Archethic.Mining.TransactionContext do
alias Archethic.Election

alias Archethic.P2P
alias Archethic.P2P.Message
alias Archethic.P2P.Message.Ok
alias Archethic.P2P.Message.Ping
alias Archethic.P2P.Node
Expand Down Expand Up @@ -61,7 +60,7 @@ defmodule Archethic.Mining.TransactionContext do
prev_tx_task = request_previous_tx(previous_address, prev_tx_nodes_split)
nodes_view_task = request_nodes_view(node_public_keys)

prev_tx = Task.await(prev_tx_task, Message.get_max_timeout())
prev_tx = Task.await(prev_tx_task)
nodes_view = Task.await(nodes_view_task)

%{
Expand Down Expand Up @@ -102,15 +101,16 @@ defmodule Archethic.Mining.TransactionContext do
Task.Supervisor.async(
TaskSupervisor,
fn ->
case TransactionChain.fetch_transaction_remotely(previous_address, nodes) do
# Timeout of 4 sec because the coordinator node wait 5 sec to get the context
# from the cross validation nodes
case TransactionChain.fetch_transaction_remotely(previous_address, nodes, 4000) do
{:ok, tx} ->
tx

{:error, _} ->
nil
end
end,
timeout: Message.get_max_timeout()
end
)
end

Expand Down
6 changes: 3 additions & 3 deletions lib/archethic/replication.ex
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,10 @@ defmodule Archethic.Replication do

{previous_transaction, inputs} =
if self_repair? do
{Task.await(t1, Message.get_max_timeout()), []}
{Task.await(t1, Message.get_max_timeout() + 1000), []}
else
t2 = Task.Supervisor.async(TaskSupervisor, fn -> fetch_inputs(tx, download_nodes) end)
{Task.await(t1, Message.get_max_timeout()), Task.await(t2)}
inputs = fetch_inputs(tx, download_nodes)
{Task.await(t1, Message.get_max_timeout() + 1000), inputs}
end

Logger.debug("Previous transaction #{inspect(previous_transaction)}",
Expand Down
16 changes: 12 additions & 4 deletions lib/archethic/transaction_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -687,14 +687,21 @@ defmodule Archethic.TransactionChain do

If no nodes are available to answer the request, `{:error, :network_issue}` is returned.
"""
@spec fetch_transaction_remotely(address :: Crypto.versioned_hash(), list(Node.t())) ::
@spec fetch_transaction_remotely(
address :: Crypto.versioned_hash(),
list(Node.t()),
non_neg_integer()
) ::
{:ok, Transaction.t()}
| {:error, :transaction_not_exists}
| {:error, :transaction_invalid}
| {:error, :network_issue}
def fetch_transaction_remotely(_, []), do: {:error, :transaction_not_exists}
def fetch_transaction_remotely(address, nodes, timeout \\ Message.get_max_timeout())

def fetch_transaction_remotely(_, [], _), do: {:error, :transaction_not_exists}

def fetch_transaction_remotely(address, nodes) when is_binary(address) and is_list(nodes) do
def fetch_transaction_remotely(address, nodes, timeout)
when is_binary(address) and is_list(nodes) do
conflict_resolver = fn results ->
# Prioritize transactions results over not found
with nil <- Enum.find(results, &match?(%Transaction{}, &1)),
Expand All @@ -709,7 +716,8 @@ defmodule Archethic.TransactionChain do
case P2P.quorum_read(
nodes,
%GetTransaction{address: address},
conflict_resolver
conflict_resolver,
timeout
) do
{:ok, %NotFound{}} ->
{:error, :transaction_not_exists}
Expand Down
6 changes: 3 additions & 3 deletions test/archethic/bootstrap_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ defmodule Archethic.BootstrapTest do

assert :ok =
Bootstrap.do_resync_network_chain(
[:node_shared_secrets],
:node_shared_secrets,
_nodes = P2P.authorized_and_available_nodes()
)
end
Expand Down Expand Up @@ -523,7 +523,7 @@ defmodule Archethic.BootstrapTest do

assert :ok =
Bootstrap.do_resync_network_chain(
[:node_shared_secrets],
:node_shared_secrets,
_nodes = P2P.authorized_and_available_nodes()
)

Expand Down Expand Up @@ -598,7 +598,7 @@ defmodule Archethic.BootstrapTest do

assert :ok =
Bootstrap.do_resync_network_chain(
[:node_shared_secrets],
:node_shared_secrets,
_nodes = P2P.authorized_and_available_nodes()
)

Expand Down