diff --git a/config/dev.exs b/config/dev.exs index b54f4d7ec..4f2333e18 100755 --- a/config/dev.exs +++ b/config/dev.exs @@ -117,6 +117,8 @@ config :archethic, Archethic.SharedSecrets.NodeRenewalScheduler, config :archethic, Archethic.P2P.Listener, port: System.get_env("ARCHETHIC_P2P_PORT", "3002") |> String.to_integer() +config :archethic, Archethic.Utils.DetectNodeResponsiveness, timeout: 1_000 + config :archethic, ArchethicWeb.FaucetController, enabled: true # For development, we disable any cache and enable diff --git a/config/prod.exs b/config/prod.exs index 898d64510..c1f8a1df1 100755 --- a/config/prod.exs +++ b/config/prod.exs @@ -212,6 +212,8 @@ config :archethic, Archethic.P2P.BootstrappingSeeds, # TODO: define the default list of P2P seeds once the network will be more open to new miners genesis_seeds: System.get_env("ARCHETHIC_P2P_BOOTSTRAPPING_SEEDS") +config :archethic, Archethic.Utils.DetectNodeResponsiveness, timeout: 10_000 + config :archethic, ArchethicWeb.FaucetController, enabled: System.get_env("ARCHETHIC_NETWORK_TYPE") == "testnet" diff --git a/config/test.exs b/config/test.exs index 61eb1c90d..21f003ea1 100755 --- a/config/test.exs +++ b/config/test.exs @@ -142,6 +142,7 @@ config :archethic, Archethic.TransactionChain.MemTables.KOLedger, enabled: false config :archethic, Archethic.TransactionChain.MemTablesLoader, enabled: false config :archethic, ArchethicWeb.FaucetController, enabled: true +config :archethic, Archethic.Utils.DetectNodeResponsiveness, timeout: 1_000 # We don't run a server during test. If one is required, # you can enable the server option below. diff --git a/lib/archethic/contracts/worker.ex b/lib/archethic/contracts/worker.ex index 5c40d1b0c..912a492d6 100644 --- a/lib/archethic/contracts/worker.ex +++ b/lib/archethic/contracts/worker.ex @@ -32,7 +32,7 @@ defmodule Archethic.Contracts.Worker do alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.TransactionMovement alias Archethic.Utils - + alias Archethic.Utils.DetectNodeResponsiveness require Logger use GenServer @@ -256,23 +256,44 @@ defmodule Archethic.Contracts.Worker do defp schedule_trigger(_), do: :ok defp handle_new_transaction(next_transaction = %Transaction{}) do - [%Node{first_public_key: key} | _] = - next_transaction - |> Transaction.previous_address() - |> Election.chain_storage_nodes(P2P.authorized_and_available_nodes()) + validation_nodes = get_validation_nodes(next_transaction) # The first storage node of the contract initiate the sending of the new transaction - if key == Crypto.first_node_public_key() do - validation_nodes = P2P.authorized_and_available_nodes() - + if trigger_node?(validation_nodes) do P2P.broadcast_message(validation_nodes, %StartMining{ transaction: next_transaction, validation_node_public_keys: Enum.map(validation_nodes, & &1.last_public_key), welcome_node_public_key: Crypto.last_node_public_key() }) + else + DetectNodeResponsiveness.start_link(next_transaction.address, fn count -> + Logger.info("contract transaction ...attempt #{count}") + + if trigger_node?(validation_nodes, count) do + P2P.broadcast_message(validation_nodes, %StartMining{ + transaction: next_transaction, + validation_node_public_keys: Enum.map(validation_nodes, & &1.last_public_key), + welcome_node_public_key: Crypto.last_node_public_key() + }) + end + end) end end + defp get_validation_nodes(next_transaction = %Transaction{}) do + next_transaction + |> Transaction.previous_address() + |> Election.chain_storage_nodes(P2P.authorized_and_available_nodes()) + end + + defp trigger_node?(validation_nodes, count \\ 0) do + %Node{first_public_key: key} = + validation_nodes + |> Enum.at(count) + + key == Crypto.first_node_public_key() + end + defp chain_transaction( next_tx, prev_tx = %Transaction{ diff --git a/lib/archethic/mining.ex b/lib/archethic/mining.ex index 403512494..59e3f77eb 100644 --- a/lib/archethic/mining.ex +++ b/lib/archethic/mining.ex @@ -169,8 +169,8 @@ defmodule Archethic.Mining do |> DistributedWorkflow.add_cross_validation_stamp(stamp) end - defp get_mining_process!(tx_address) do - retry_while with: exponential_backoff(100, 2) |> expiry(@mining_timeout) do + defp get_mining_process!(tx_address, timeout \\ @mining_timeout) do + retry_while with: exponential_backoff(100, 2) |> expiry(timeout) do case Registry.lookup(WorkflowRegistry, tx_address) do [{pid, _}] -> {:halt, pid} @@ -181,6 +181,20 @@ defmodule Archethic.Mining do end end + @doc """ + Return true if the transaction is in mining process + """ + @spec processing?(binary()) :: boolean() + def processing?(tx_address) do + case get_mining_process!(tx_address, 100) do + nil -> + false + + _pid -> + true + end + end + @doc """ Validate a pending transaction """ diff --git a/lib/archethic/oracle_chain/scheduler.ex b/lib/archethic/oracle_chain/scheduler.ex index 3d769a109..e203b4c6a 100644 --- a/lib/archethic/oracle_chain/scheduler.ex +++ b/lib/archethic/oracle_chain/scheduler.ex @@ -4,7 +4,7 @@ defmodule Archethic.OracleChain.Scheduler do """ alias Archethic.Crypto - + alias Archethic.DB alias Archethic.Election alias Archethic.P2P @@ -18,7 +18,7 @@ defmodule Archethic.OracleChain.Scheduler do alias Archethic.TransactionChain alias Archethic.TransactionChain.Transaction alias Archethic.TransactionChain.TransactionData - + alias Archethic.Utils.DetectNodeResponsiveness alias Crontab.CronExpression.Parser, as: CronParser alias Crontab.Scheduler, as: CronScheduler @@ -140,21 +140,58 @@ defmodule Archethic.OracleChain.Scheduler do index = Map.fetch!(indexes, summary_date) - if trigger_node?(summary_date, index + 1) do - new_oracle_data = - summary_date - |> Crypto.derive_oracle_address(index) - |> get_oracle_data() - |> Services.fetch_new_data() + new_oracle_data = get_new_oracle_data(summary_date, index) + + authorized_nodes = + summary_date + |> P2P.authorized_nodes() + |> Enum.filter(& &1.available?) + + storage_nodes = + summary_date + |> Crypto.derive_oracle_address(index + 1) + |> Election.storage_nodes(authorized_nodes) + + tx = build_oracle_transaction(summary_date, index, new_oracle_data) - if Enum.empty?(new_oracle_data) do - Logger.debug("Oracle transaction skipped - no new data") + watcher_pid = + with {:empty, false} <- {:empty, Enum.empty?(new_oracle_data)}, + {:trigger, true} <- {:trigger, trigger_node?(storage_nodes)}, + {:exists, false} <- {:exists, DB.transaction_exists?(tx.address)} do + send_polling_transaction(tx) + nil else - send_polling_transaction(new_oracle_data, index, summary_date) + {:empty, true} -> + Logger.debug("Oracle transaction skipped - no new data") + nil + + {:trigger, false} -> + {:ok, pid} = + DetectNodeResponsiveness.start_link(tx.address, fn count -> + new_oracle_data = get_new_oracle_data(summary_date, index) + new_data? = !Enum.empty?(new_oracle_data) + + if trigger_node?(storage_nodes, count) and new_data? do + Logger.info("Oracle polling transaction ...attempt #{count}", + transaction_address: Base.encode16(tx.address), + transaction_type: :oracle + ) + + tx = build_oracle_transaction(summary_date, index, new_oracle_data) + send_polling_transaction(tx) + end + end) + + pid + + {:exists, true} -> + Logger.warning("Transaction already exists - before sending", + transaction_address: Base.encode16(tx.address), + transaction_type: :oracle + ) + + nil end - else - Logger.debug("Oracle transaction skipped - not the trigger node") - end current_time = DateTime.utc_now() |> DateTime.truncate(:second) next_polling_date = next_date(polling_interval, current_time) @@ -163,6 +200,7 @@ defmodule Archethic.OracleChain.Scheduler do data |> Map.put(:polling_date, next_polling_date) |> Map.put(:polling_timer, schedule_new_polling(next_polling_date, current_time)) + |> Map.put(:oracle_watcher_pid, watcher_pid) {:next_state, :ready, new_data} end @@ -177,20 +215,71 @@ defmodule Archethic.OracleChain.Scheduler do Logger.debug("Oracle summary - state: #{inspect(data)}") index = Map.fetch!(indexes, summary_date) + validation_nodes = get_validation_nodes(summary_date, index + 1) - if trigger_node?(summary_date, index + 1) do - Logger.debug("Oracle transaction summary sending") - send_summary_transaction(summary_date, index) - else - Logger.debug("Oracle summary skipped - not the trigger node") + # Stop previous oracle retries when the summary is triggered + case Map.get(data, :oracle_watcher_pid) do + nil -> + :ok + + pid -> + Process.exit(pid, :normal) end + tx_address = + summary_date + |> Crypto.derive_oracle_keypair(index + 1) + |> elem(0) + |> Crypto.derive_address() + + summary_watcher_pid = + with {:trigger, true} <- {:trigger, trigger_node?(validation_nodes)}, + {:exists, false} <- {:exists, DB.transaction_exists?(tx_address)} do + Logger.debug("Oracle transaction summary sending", + transaction_address: Base.encode16(tx_address), + transaction_type: :oracle_summary + ) + + send_summary_transaction(summary_date, index) + nil + else + {:trigger, false} -> + Logger.debug("Oracle summary skipped - not the trigger node", + transaction_address: Base.encode16(tx_address), + transaction_type: :oracle_summary + ) + + {:ok, pid} = + DetectNodeResponsiveness.start_link(tx_address, fn count -> + if trigger_node?(validation_nodes, count) do + Logger.info("Oracle summary transaction ...attempt #{count}", + transaction_address: Base.encode16(tx_address), + transaction_type: :oracle_summary + ) + + send_summary_transaction(summary_date, index) + end + end) + + pid + + {:exists, true} -> + Logger.warning("Oracle transaction already exists", + transaction_address: Base.encode16(tx_address), + transaction_type: :oracle_summary + ) + + nil + end + current_time = DateTime.utc_now() |> DateTime.truncate(:second) next_summary_date = next_date(summary_interval, current_time) Logger.info("Next Oracle Summary at #{DateTime.to_string(next_summary_date)}") new_data = data + |> Map.put(:summary_watcher_pid, summary_watcher_pid) + |> Map.delete(:oracle_watcher_pid) |> Map.put(:summary_date, next_summary_date) |> Map.update!(:indexes, fn indexes -> # Clean previous indexes @@ -371,63 +460,53 @@ defmodule Archethic.OracleChain.Scheduler do ) end - defp trigger_node?(summary_date = %DateTime{}, index) do - authorized_nodes = P2P.authorized_nodes(summary_date) |> Enum.filter(& &1.available?) - - storage_nodes = - summary_date - |> Crypto.derive_oracle_address(index) - |> Election.storage_nodes(authorized_nodes) + defp trigger_node?(validation_nodes, count \\ 0) do + %Node{first_public_key: initiator_key} = + validation_nodes + |> Enum.at(count) - node_public_key = Crypto.first_node_public_key() + initiator_key == Crypto.first_node_public_key() + end - case storage_nodes do - [%Node{first_public_key: ^node_public_key} | _] -> - true + defp send_polling_transaction(tx) do + Task.start(fn -> Archethic.send_new_transaction(tx) end) - _ -> - false - end + Logger.debug("New data pushed to the oracle", + transaction_address: Base.encode16(tx.address), + transaction_type: :oracle + ) end - defp send_polling_transaction(oracle_data, index, summary_date) do + defp build_oracle_transaction(summary_date, index, oracle_data) do {prev_pub, prev_pv} = Crypto.derive_oracle_keypair(summary_date, index) {next_pub, _} = Crypto.derive_oracle_keypair(summary_date, index + 1) - tx = - Transaction.new_with_keys( - :oracle, - %TransactionData{ - content: Jason.encode!(oracle_data), - code: ~S""" - condition inherit: [ - # We need to ensure the type stays consistent - # So we can apply specific rules during the transaction validation - type: in?([oracle, oracle_summary]), - - # We discard the content and code verification - content: true, - - # We ensure the code stay the same - code: if type == oracle_summary do - regex_match?("condition inherit: \\[[\\s].*content: \\\"\\\"[\\s].*]") - else - previous.code - end - ] - """ - }, - prev_pv, - prev_pub, - next_pub - ) - - Task.start(fn -> Archethic.send_new_transaction(tx) end) - - Logger.debug("New data pushed to the oracle", - transaction_address: Base.encode16(tx.address), - transaction_type: :oracle + Transaction.new_with_keys( + :oracle, + %TransactionData{ + content: Jason.encode!(oracle_data), + code: ~S""" + condition inherit: [ + # We need to ensure the type stays consistent + # So we can apply specific rules during the transaction validation + type: in?([oracle, oracle_summary]), + + # We discard the content and code verification + content: true, + + # We ensure the code stay the same + code: if type == oracle_summary do + regex_match?("condition inherit: \\[[\\s].*content: \\\"\\\"[\\s].*]") + else + previous.code + end + ] + """ + }, + prev_pv, + prev_pub, + next_pub ) end @@ -461,13 +540,21 @@ defmodule Archethic.OracleChain.Scheduler do next_pub ) - Logger.debug( - "Sending oracle summary transaction - aggregation: #{inspect(aggregated_content)}", - transaction_address: Base.encode16(tx.address), - transaction_type: :oracle_summary - ) + if DB.transaction_exists?(tx.address) do + Logger.debug( + "Transaction Already Exists:oracle summary transaction - aggregation: #{inspect(aggregated_content)}", + transaction_address: Base.encode16(tx.address), + transaction_type: :oracle_summary + ) + else + Logger.debug( + "Sending oracle summary transaction - aggregation: #{inspect(aggregated_content)}", + transaction_address: Base.encode16(tx.address), + transaction_type: :oracle_summary + ) - Task.start(fn -> Archethic.send_new_transaction(tx) end) + Task.start(fn -> Archethic.send_new_transaction(tx) end) + end end defp get_chain(address, opts \\ [], acc \\ []) do @@ -512,4 +599,19 @@ defmodule Archethic.OracleChain.Scheduler do |> DateTime.from_naive!("Etc/UTC") end end + + defp get_validation_nodes(summary_date, index) do + authorized_nodes = P2P.authorized_nodes(summary_date) |> Enum.filter(& &1.available?) + + summary_date + |> Crypto.derive_oracle_address(index) + |> Election.storage_nodes(authorized_nodes) + end + + defp get_new_oracle_data(summary_date, index) do + summary_date + |> Crypto.derive_oracle_address(index) + |> get_oracle_data() + |> Services.fetch_new_data() + end end diff --git a/lib/archethic/shared_secrets/node_renewal.ex b/lib/archethic/shared_secrets/node_renewal.ex index 46edcfbe3..7ca6f44a4 100644 --- a/lib/archethic/shared_secrets/node_renewal.ex +++ b/lib/archethic/shared_secrets/node_renewal.ex @@ -28,12 +28,12 @@ defmodule Archethic.SharedSecrets.NodeRenewal do @doc """ Determine if the local node is the initiator of the node renewal """ - @spec initiator?() :: boolean() - def initiator? do + @spec initiator? :: boolean() + def initiator?(index \\ 0) do %Node{first_public_key: initiator_key} = next_address() |> Election.storage_nodes(P2P.authorized_and_available_nodes()) - |> List.first() + |> Enum.at(index) initiator_key == Crypto.first_node_public_key() end diff --git a/lib/archethic/shared_secrets/node_renewal_scheduler.ex b/lib/archethic/shared_secrets/node_renewal_scheduler.ex index 168d96618..152266aee 100644 --- a/lib/archethic/shared_secrets/node_renewal_scheduler.ex +++ b/lib/archethic/shared_secrets/node_renewal_scheduler.ex @@ -28,6 +28,7 @@ defmodule Archethic.SharedSecrets.NodeRenewalScheduler do alias Archethic.SharedSecrets.NodeRenewal alias Archethic.Utils + alias Archethic.Utils.DetectNodeResponsiveness require Logger @@ -117,9 +118,23 @@ defmodule Archethic.SharedSecrets.NodeRenewalScheduler do "Node shared secrets will be renewed in #{Utils.remaining_seconds_from_timer(timer)}" ) + tx = + NodeRenewal.next_authorized_node_public_keys() + |> NodeRenewal.new_node_shared_secrets_transaction( + :crypto.strong_rand_bytes(32), + :crypto.strong_rand_bytes(32) + ) + if NodeRenewal.initiator?() do Logger.info("Node shared secrets renewal creation...") - make_renewal() + make_renewal(tx) + else + DetectNodeResponsiveness.start_link(tx.address, fn count -> + if NodeRenewal.initiator?(count) do + Logger.info("Node shared secret renewal creation...attempt #{count}") + make_renewal(tx) + end + end) end {:noreply, Map.put(state, :timer, timer), :hibernate} @@ -135,14 +150,7 @@ defmodule Archethic.SharedSecrets.NodeRenewalScheduler do end end - defp make_renewal do - tx = - NodeRenewal.next_authorized_node_public_keys() - |> NodeRenewal.new_node_shared_secrets_transaction( - :crypto.strong_rand_bytes(32), - :crypto.strong_rand_bytes(32) - ) - + defp make_renewal(tx) do Archethic.send_new_transaction(tx) Logger.info( diff --git a/lib/archethic/utils/detect_node_responsiveness.ex b/lib/archethic/utils/detect_node_responsiveness.ex new file mode 100644 index 000000000..9b16ddcee --- /dev/null +++ b/lib/archethic/utils/detect_node_responsiveness.ex @@ -0,0 +1,53 @@ +defmodule Archethic.Utils.DetectNodeResponsiveness do + @moduledoc """ + Detects the nodes responsiveness based on timeouts + """ + @default_timeout Application.compile_env(:archethic, __MODULE__) |> Keyword.get(:timeout, 5_000) + alias Archethic.P2P + alias Archethic.DB + alias Archethic.Mining + + use GenStateMachine + require Logger + + def start_link(address, replaying_fn, timeout \\ @default_timeout) do + GenStateMachine.start_link(__MODULE__, [address, replaying_fn, timeout], []) + end + + def init([address, replaying_fn, timeout]) do + schedule_timeout(timeout) + {:ok, :waiting, %{address: address, replaying_fn: replaying_fn, count: 1, timeout: timeout}} + end + + def handle_event( + :info, + :soft_timeout, + :waiting, + state = %{ + address: address, + replaying_fn: replaying_fn, + count: count, + timeout: timeout + } + ) do + with false <- DB.transaction_exists?(address), + false <- Mining.processing?(address), + true <- count < length(P2P.authorized_and_available_nodes()) do + Logger.info("calling replay fn with count=#{count}", + transaction_address: Base.encode16(address) + ) + + replaying_fn.(count) + schedule_timeout(timeout) + new_count = count + 1 + {:keep_state, %{state | count: new_count}} + else + # hard_timeout + _ -> :stop + end + end + + defp schedule_timeout(interval, pid \\ self()) do + Process.send_after(pid, :soft_timeout, interval) + end +end diff --git a/test/archethic/utils/detect_node_responsiveness_test.exs b/test/archethic/utils/detect_node_responsiveness_test.exs new file mode 100644 index 000000000..6a0499ff7 --- /dev/null +++ b/test/archethic/utils/detect_node_responsiveness_test.exs @@ -0,0 +1,332 @@ +defmodule Archethic.Utils.DetectNodeResponsivenessTest do + use ArchethicCase + @timeout 200 + @sleep_timeout 350 + + alias Archethic.Utils.DetectNodeResponsiveness + + alias Archethic.Mining.WorkflowRegistry + + import Mox + alias Archethic.Crypto + alias Archethic.P2P + alias Archethic.P2P.Node + + test "start_link/2 for start state" do + address = <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> + + replaying_fn = fn count -> + count + end + + {:ok, pid} = DetectNodeResponsiveness.start_link(address, replaying_fn, @timeout) + assert true == Process.alive?(pid) + end + + test "start_link/2 for hard timeout state" do + address = <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> + # dummy replaying fn with count as argument + + replaying_fn = fn count -> + count + end + + {:ok, pid} = DetectNodeResponsiveness.start_link(address, replaying_fn, @timeout) + + Process.sleep(@sleep_timeout) + assert false == Process.alive?(pid) + end + + test "start_link/2 for soft timeout state" do + address = <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> + + me = self() + + replaying_fn = fn count -> + send(me, :replay) + count + end + + {pub1, _} = Crypto.derive_keypair("node1", 0) + {pub2, _} = Crypto.derive_keypair("node2", 0) + {pub3, _} = Crypto.derive_keypair("node3", 0) + {pub4, _} = Crypto.derive_keypair("node4", 0) + + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 1}, + port: 3000, + authorized?: true, + last_public_key: pub1, + first_public_key: pub1, + available?: true, + geo_patch: "AAA", + network_patch: "AAA", + enrollment_date: DateTime.utc_now(), + authorization_date: DateTime.utc_now() |> DateTime.add(-10), + reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> + }) + + P2P.add_and_connect_node(%Node{ + first_public_key: pub2, + last_public_key: pub2, + authorized?: true, + available?: true, + authorization_date: DateTime.utc_now() |> DateTime.add(-10), + geo_patch: "AAA", + network_patch: "AAA", + reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>, + enrollment_date: DateTime.utc_now() + }) + + P2P.add_and_connect_node(%Node{ + first_public_key: pub3, + last_public_key: pub3, + authorized?: true, + available?: true, + authorization_date: DateTime.utc_now() |> DateTime.add(-10), + geo_patch: "AAA", + network_patch: "AAA", + reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>, + enrollment_date: DateTime.utc_now() + }) + + P2P.add_and_connect_node(%Node{ + first_public_key: pub4, + last_public_key: pub4, + authorized?: true, + available?: true, + authorization_date: DateTime.utc_now() |> DateTime.add(-10), + geo_patch: "AAA", + network_patch: "AAA", + reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>, + enrollment_date: DateTime.utc_now() + }) + + {:ok, pid} = DetectNodeResponsiveness.start_link(address, replaying_fn, @timeout) + + MockDB + |> stub(:transaction_exists?, fn ^address -> + false + end) + + # first soft_timeout + assert true == Process.alive?(pid) + assert_receive :replay, @sleep_timeout + # second soft_timeout + assert_receive :replay, @sleep_timeout + assert true == Process.alive?(pid) + # third soft_timeout + assert_receive :replay, @sleep_timeout + assert true == Process.alive?(pid) + # last timeout leading to stop as hard_timeout + Process.sleep(@sleep_timeout) + assert false == Process.alive?(pid) + end + + test "should not retry if the transaction exists" do + address = <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> + + me = self() + + replaying_fn = fn _count -> + send(me, :replay) + end + + {pub1, _} = Crypto.derive_keypair("node1", 0) + {pub2, _} = Crypto.derive_keypair("node2", 0) + + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 1}, + port: 3000, + authorized?: true, + last_public_key: pub1, + first_public_key: pub1, + available?: true, + geo_patch: "AAA", + network_patch: "AAA", + enrollment_date: DateTime.utc_now(), + authorization_date: DateTime.utc_now() |> DateTime.add(-10), + reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> + }) + + P2P.add_and_connect_node(%Node{ + first_public_key: pub2, + last_public_key: pub2, + authorized?: true, + available?: true, + authorization_date: DateTime.utc_now() |> DateTime.add(-10), + geo_patch: "AAA", + network_patch: "AAA", + reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>, + enrollment_date: DateTime.utc_now() + }) + + {:ok, pid} = DetectNodeResponsiveness.start_link(address, replaying_fn, @timeout) + + MockDB + |> stub(:transaction_exists?, fn ^address -> + Process.send_after(me, :transaction_stored, 50) + true + end) + + # first soft_timeout + assert_receive :transaction_stored, @sleep_timeout + assert !Process.alive?(pid) + end + + test "should retry after the first timeout" do + address = <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> + + me = self() + + replaying_fn = fn _count -> + send(me, :replay) + end + + {pub1, _} = Crypto.derive_keypair("node1", 0) + {pub2, _} = Crypto.derive_keypair("node2", 0) + + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 1}, + port: 3000, + authorized?: true, + last_public_key: pub1, + first_public_key: pub1, + available?: true, + geo_patch: "AAA", + network_patch: "AAA", + enrollment_date: DateTime.utc_now(), + authorization_date: DateTime.utc_now() |> DateTime.add(-10), + reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> + }) + + P2P.add_and_connect_node(%Node{ + first_public_key: pub2, + last_public_key: pub2, + authorized?: true, + available?: true, + authorization_date: DateTime.utc_now() |> DateTime.add(-10), + geo_patch: "AAA", + network_patch: "AAA", + reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>, + enrollment_date: DateTime.utc_now() + }) + + {:ok, pid} = DetectNodeResponsiveness.start_link(address, replaying_fn, @timeout) + + MockDB + |> expect(:transaction_exists?, fn ^address -> + false + end) + |> expect(:transaction_exists?, fn ^address -> + Process.send_after(me, :transaction_stored, 50) + true + end) + + assert_receive :replay, @sleep_timeout + assert_receive :transaction_stored, @sleep_timeout + assert !Process.alive?(pid) + end + + test "should run all the nodes to force the retry" do + address = <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> + + me = self() + + replaying_fn = fn _count -> + send(me, :replay) + end + + {pub1, _} = Crypto.derive_keypair("node1", 0) + {pub2, _} = Crypto.derive_keypair("node2", 0) + + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 1}, + port: 3000, + authorized?: true, + last_public_key: pub1, + first_public_key: pub1, + available?: true, + geo_patch: "AAA", + network_patch: "AAA", + enrollment_date: DateTime.utc_now(), + authorization_date: DateTime.utc_now() |> DateTime.add(-10), + reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> + }) + + P2P.add_and_connect_node(%Node{ + first_public_key: pub2, + last_public_key: pub2, + authorized?: true, + available?: true, + authorization_date: DateTime.utc_now() |> DateTime.add(-10), + geo_patch: "AAA", + network_patch: "AAA", + reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>, + enrollment_date: DateTime.utc_now() + }) + + {:ok, pid} = DetectNodeResponsiveness.start_link(address, replaying_fn, @timeout) + + MockDB + |> stub(:transaction_exists?, fn ^address -> + false + end) + + assert_receive :replay, @sleep_timeout + Process.sleep(@sleep_timeout) + assert !Process.alive?(pid) + end + + test "should not retry if the transaction is in mining process" do + address = <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> + + me = self() + + replaying_fn = fn _count -> + send(me, :replay) + end + + {pub1, _} = Crypto.derive_keypair("node1", 0) + {pub2, _} = Crypto.derive_keypair("node2", 0) + + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 1}, + port: 3000, + authorized?: true, + last_public_key: pub1, + first_public_key: pub1, + available?: true, + geo_patch: "AAA", + network_patch: "AAA", + enrollment_date: DateTime.utc_now(), + authorization_date: DateTime.utc_now() |> DateTime.add(-10), + reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> + }) + + P2P.add_and_connect_node(%Node{ + first_public_key: pub2, + last_public_key: pub2, + authorized?: true, + available?: true, + authorization_date: DateTime.utc_now() |> DateTime.add(-10), + geo_patch: "AAA", + network_patch: "AAA", + reward_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>, + enrollment_date: DateTime.utc_now() + }) + + {:ok, pid} = DetectNodeResponsiveness.start_link(address, replaying_fn, @timeout) + + MockDB + |> stub(:transaction_exists?, fn ^address -> + false + end) + + Registry.register(WorkflowRegistry, address, []) + + # first soft_timeout + Process.sleep(@sleep_timeout) + assert !Process.alive?(pid) + end +end