Skip to content

Commit

Permalink
Detection of unresponsive trigger node Contd of 376 (#395)
Browse files Browse the repository at this point in the history
* Added fsm to detect unresponsive trigger nodes

* Added tests for Node responsiveness detection

* Repolling oracle data to avoid consensus not reached

* Fixed oracle address derivation & new oracle polling tx during detection

* Add mining process control
  • Loading branch information
imnik11 committed Jul 5, 2022
1 parent dd0d0bc commit 113b6c3
Show file tree
Hide file tree
Showing 10 changed files with 630 additions and 95 deletions.
2 changes: 2 additions & 0 deletions config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ config :archethic, Archethic.SharedSecrets.NodeRenewalScheduler,
config :archethic, Archethic.P2P.Listener,
port: System.get_env("ARCHETHIC_P2P_PORT", "3002") |> String.to_integer()

config :archethic, Archethic.Utils.DetectNodeResponsiveness, timeout: 1_000

config :archethic, ArchethicWeb.FaucetController, enabled: true

# For development, we disable any cache and enable
Expand Down
2 changes: 2 additions & 0 deletions config/prod.exs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ config :archethic, Archethic.P2P.BootstrappingSeeds,
# TODO: define the default list of P2P seeds once the network will be more open to new miners
genesis_seeds: System.get_env("ARCHETHIC_P2P_BOOTSTRAPPING_SEEDS")

config :archethic, Archethic.Utils.DetectNodeResponsiveness, timeout: 10_000

config :archethic, ArchethicWeb.FaucetController,
enabled: System.get_env("ARCHETHIC_NETWORK_TYPE") == "testnet"

Expand Down
1 change: 1 addition & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ config :archethic, Archethic.TransactionChain.MemTables.KOLedger, enabled: false
config :archethic, Archethic.TransactionChain.MemTablesLoader, enabled: false

config :archethic, ArchethicWeb.FaucetController, enabled: true
config :archethic, Archethic.Utils.DetectNodeResponsiveness, timeout: 1_000

# We don't run a server during test. If one is required,
# you can enable the server option below.
Expand Down
37 changes: 29 additions & 8 deletions lib/archethic/contracts/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ defmodule Archethic.Contracts.Worker do
alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.TransactionMovement

alias Archethic.Utils

alias Archethic.Utils.DetectNodeResponsiveness
require Logger

use GenServer
Expand Down Expand Up @@ -256,23 +256,44 @@ defmodule Archethic.Contracts.Worker do
defp schedule_trigger(_), do: :ok

defp handle_new_transaction(next_transaction = %Transaction{}) do
[%Node{first_public_key: key} | _] =
next_transaction
|> Transaction.previous_address()
|> Election.chain_storage_nodes(P2P.authorized_and_available_nodes())
validation_nodes = get_validation_nodes(next_transaction)

# The first storage node of the contract initiate the sending of the new transaction
if key == Crypto.first_node_public_key() do
validation_nodes = P2P.authorized_and_available_nodes()

if trigger_node?(validation_nodes) do
P2P.broadcast_message(validation_nodes, %StartMining{
transaction: next_transaction,
validation_node_public_keys: Enum.map(validation_nodes, & &1.last_public_key),
welcome_node_public_key: Crypto.last_node_public_key()
})
else
DetectNodeResponsiveness.start_link(next_transaction.address, fn count ->
Logger.info("contract transaction ...attempt #{count}")

if trigger_node?(validation_nodes, count) do
P2P.broadcast_message(validation_nodes, %StartMining{
transaction: next_transaction,
validation_node_public_keys: Enum.map(validation_nodes, & &1.last_public_key),
welcome_node_public_key: Crypto.last_node_public_key()
})
end
end)
end
end

defp get_validation_nodes(next_transaction = %Transaction{}) do
next_transaction
|> Transaction.previous_address()
|> Election.chain_storage_nodes(P2P.authorized_and_available_nodes())
end

defp trigger_node?(validation_nodes, count \\ 0) do
%Node{first_public_key: key} =
validation_nodes
|> Enum.at(count)

key == Crypto.first_node_public_key()
end

defp chain_transaction(
next_tx,
prev_tx = %Transaction{
Expand Down
18 changes: 16 additions & 2 deletions lib/archethic/mining.ex
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ defmodule Archethic.Mining do
|> DistributedWorkflow.add_cross_validation_stamp(stamp)
end

defp get_mining_process!(tx_address) do
retry_while with: exponential_backoff(100, 2) |> expiry(@mining_timeout) do
defp get_mining_process!(tx_address, timeout \\ @mining_timeout) do
retry_while with: exponential_backoff(100, 2) |> expiry(timeout) do
case Registry.lookup(WorkflowRegistry, tx_address) do
[{pid, _}] ->
{:halt, pid}
Expand All @@ -181,6 +181,20 @@ defmodule Archethic.Mining do
end
end

@doc """
Return true if the transaction is in mining process
"""
@spec processing?(binary()) :: boolean()
def processing?(tx_address) do
case get_mining_process!(tx_address, 100) do
nil ->
false

_pid ->
true
end
end

@doc """
Validate a pending transaction
"""
Expand Down
Loading

0 comments on commit 113b6c3

Please sign in to comment.