diff --git a/lib/archethic/beacon_chain/slot_timer.ex b/lib/archethic/beacon_chain/slot_timer.ex index 0150ad460..20e0af558 100644 --- a/lib/archethic/beacon_chain/slot_timer.ex +++ b/lib/archethic/beacon_chain/slot_timer.ex @@ -22,6 +22,8 @@ defmodule Archethic.BeaconChain.SlotTimer do require Logger + @slot_timer_ets :archethic_slot_timer + @doc """ Create a new slot timer """ @@ -85,19 +87,36 @@ defmodule Archethic.BeaconChain.SlotTimer do end defp get_interval do - [{_, interval}] = :ets.lookup(:archethic_slot_timer, :interval) + [{_, interval}] = :ets.lookup(@slot_timer_ets, :interval) interval end @doc false def init(opts) do + :ets.new(@slot_timer_ets, [:named_table, :public, read_concurrency: true]) interval = Keyword.get(opts, :interval) - :ets.new(:archethic_slot_timer, [:named_table, :public, read_concurrency: true]) - :ets.insert(:archethic_slot_timer, {:interval, interval}) + :ets.insert(@slot_timer_ets, {:interval, interval}) + + case :persistent_term.get(:archethic_up, nil) do + nil -> + Logger.info("Slot Timer: Waiting for Node to complete Bootstrap.") + + Archethic.PubSub.register_to_node_up() + {:ok, %{interval: interval}} + + :up -> + Logger.info("Slot Timer: Starting...") + + {:ok, %{interval: interval, timer: schedule_new_slot(interval)}} + end + end + + def handle_info(:node_up, server_data = %{interval: interval}) do + Logger.info("Slot Timer: Starting...") - Logger.info("Starting SlotTimer") + new_server_data = Map.put(server_data, :timer, schedule_new_slot(interval)) - {:ok, %{interval: interval, timer: schedule_new_slot(interval)}, :hibernate} + {:noreply, new_server_data, :hibernate} end def handle_info( @@ -130,7 +149,7 @@ defmodule Archethic.BeaconChain.SlotTimer do {:noreply, state} new_interval -> - :ets.insert(:archethic_slot_timer, {:interval, new_interval}) + :ets.insert(@slot_timer_ets, {:interval, new_interval}) {:noreply, Map.put(state, :interval, new_interval)} end end diff --git a/lib/archethic/bootstrap.ex b/lib/archethic/bootstrap.ex index 580dd54ed..7f0e75af5 100644 --- a/lib/archethic/bootstrap.ex +++ b/lib/archethic/bootstrap.ex @@ -116,6 +116,8 @@ defmodule Archethic.Bootstrap do else post_bootstrap(sync?: false) end + + Logger.info("Bootstrapping finished!") end defp should_bootstrap?(_ip, _port, _http_port, _, nil), do: true @@ -195,8 +197,6 @@ defmodule Archethic.Bootstrap do end end end - - Logger.info("Bootstrapping finished!") end defp post_bootstrap(opts) do @@ -213,6 +213,7 @@ defmodule Archethic.Bootstrap do SelfRepair.start_scheduler() :persistent_term.put(:archethic_up, :up) + Archethic.PubSub.notify_node_up() Listener.listen() end diff --git a/lib/archethic/oracle_chain/scheduler.ex b/lib/archethic/oracle_chain/scheduler.ex index abad99f41..99e6cf979 100644 --- a/lib/archethic/oracle_chain/scheduler.ex +++ b/lib/archethic/oracle_chain/scheduler.ex @@ -49,38 +49,63 @@ defmodule Archethic.OracleChain.Scheduler do polling_interval = Keyword.fetch!(args, :polling_interval) summary_interval = Keyword.fetch!(args, :summary_interval) + state_data = + %{} + |> Map.put(:polling_interval, polling_interval) + |> Map.put(:summary_interval, summary_interval) + + case :persistent_term.get(:archethic_up, nil) do + nil -> + # node still bootstrapping , wait for it to finish Bootstrap + Logger.info(" Oracle Scheduler: Waiting for Node to complete Bootstrap. ") + + PubSub.register_to_node_up() + + {:ok, :idle, state_data} + + # wait for node UP + :up -> + # when node is already bootstrapped, - handles scheduler crash + {state, new_state_data, events} = start_scheduler(state_data) + {:ok, state, new_state_data, events} + end + end + + def start_scheduler(state_data) do + Logger.info("Oracle Scheduler: Starting... ") + PubSub.register_to_node_update() - Logger.info("Starting Oracle Scheduler") case P2P.get_node_info(Crypto.first_node_public_key()) do # Schedule polling for authorized node # This case may happen in case of process restart after crash {:ok, %Node{authorized?: true, available?: true}} -> summary_date = - next_date(summary_interval, DateTime.utc_now() |> DateTime.truncate(:second)) + next_date( + Map.get(state_data, :summary_interval), + DateTime.utc_now() |> DateTime.truncate(:second) + ) PubSub.register_to_new_transaction_by_type(:oracle) index = chain_size(summary_date) Logger.info("Oracle Scheduler: Scheduled during init - (index: #{index})") - {:ok, :idle, - %{ - polling_interval: polling_interval, - summary_interval: summary_interval, - summary_date: summary_date, - indexes: %{summary_date => index} - }, {:next_event, :internal, :schedule}} + new_state_data = + state_data + |> Map.put(:summary_date, summary_date) + |> Map.put(:indexes, %{summary_date => index}) + + {:idle, new_state_data, {:next_event, :internal, :schedule}} _ -> Logger.info("Oracle Scheduler: waiting for Node Update Message") - {:ok, :idle, - %{ - polling_interval: polling_interval, - summary_interval: summary_interval, - indexes: %{} - }} + new_state_data = + state_data + |> Map.put(:indexes, %{}) + + {:idle, new_state_data, []} end end @@ -133,6 +158,12 @@ defmodule Archethic.OracleChain.Scheduler do {:next_state, :scheduled, new_data} end + def handle_event(:info, :node_up, :idle, state_data) do + PubSub.unregister_to_node_up() + {:idle, new_state_data, events} = start_scheduler(state_data) + {:keep_state, new_state_data, events} + end + def handle_event( :info, {:new_transaction, address, :oracle, _timestamp}, diff --git a/lib/archethic/pub_sub.ex b/lib/archethic/pub_sub.ex index d32a419b4..ef9a0842b 100644 --- a/lib/archethic/pub_sub.ex +++ b/lib/archethic/pub_sub.ex @@ -103,6 +103,30 @@ defmodule Archethic.PubSub do ) end + @doc """ + Notify that Node is Up + """ + @spec notify_node_up() :: :ok + def notify_node_up() do + dispatch(:node_up, :node_up) + end + + @spec register_to_node_up :: {:error, {:already_registered, pid}} | {:ok, pid} + @doc """ + Register a process to notify node up + """ + def register_to_node_up() do + Registry.register(PubSubRegistry, :node_up, []) + end + + @doc """ + UnRegister a process to notification node up + """ + @spec unregister_to_node_up :: :ok + def unregister_to_node_up() do + Registry.unregister(PubSubRegistry, :node_up) + end + @doc """ Register a process to a new transaction publication by type """ diff --git a/lib/archethic/reward/scheduler.ex b/lib/archethic/reward/scheduler.ex index 5c2ff3845..a3e02d6bd 100644 --- a/lib/archethic/reward/scheduler.ex +++ b/lib/archethic/reward/scheduler.ex @@ -23,6 +23,7 @@ defmodule Archethic.Reward.Scheduler do require Logger + @spec start_link(any) :: :ignore | {:error, any} | {:ok, pid} def start_link(args \\ []) do GenStateMachine.start_link(__MODULE__, args, name: __MODULE__) end @@ -37,8 +38,30 @@ defmodule Archethic.Reward.Scheduler do def init(args) do interval = Keyword.fetch!(args, :interval) + state_data = Map.put(%{}, :interval, interval) + + case :persistent_term.get(:archethic_up, nil) do + nil -> + Logger.info(" Reward Scheduler: Waiting for Node to complete Bootstrap. ") + + PubSub.register_to_node_up() + {:ok, :idle, state_data} + + # wait for node up + + :up -> + {state, new_state_data, events} = start_scheduler(state_data) + {:ok, state, new_state_data, events} + end + end + + @doc """ + Computers start parameters for the scheduler + """ + def start_scheduler(state_data) do + Logger.info("Reward Scheduler: Starting... ") + PubSub.register_to_node_update() - Logger.info("Starting Reward Scheduler") case Crypto.first_node_public_key() |> P2P.get_node_info() |> elem(1) do %Node{authorized?: true, available?: true} -> @@ -48,17 +71,26 @@ defmodule Archethic.Reward.Scheduler do index = Crypto.number_of_network_pool_keys() Logger.info("Reward Scheduler scheduled during init - (index: #{index})") - {:ok, :idle, - %{interval: interval, index: index, next_address: Reward.next_address(index)}, + {:idle, + state_data + |> Map.put(:index, index) + |> Map.put(:next_address, Reward.next_address(index)), {:next_event, :internal, :schedule}} _ -> Logger.info("Reward Scheduler waiting for Node Update Message") - {:ok, :idle, %{interval: interval}} + {:idle, state_data, []} end end + def handle_event(:info, :node_up, :idle, state_data) do + # Node is up start Scheduler + PubSub.unregister_to_node_up() + {:idle, new_state_data, events} = start_scheduler(state_data) + {:keep_state, new_state_data, events} + end + def handle_event( :info, {:node_update, @@ -138,41 +170,13 @@ defmodule Archethic.Reward.Scheduler do :triggered, data = %{index: index} ) do - next_index = index + 1 - next_address = Reward.next_address(next_index) - new_data = data - |> Map.put(:index, next_index) - |> Map.put(:next_address, next_address) - - case Map.get(data, :watcher) do - nil -> - :ignore - - pid -> - Process.exit(pid, :normal) - end - - if Reward.initiator?(next_address) do - send_node_rewards(next_index) - {:keep_state, new_data} - else - {:ok, pid} = - DetectNodeResponsiveness.start_link(next_address, fn count -> - if Reward.initiator?(next_address, count) do - Logger.debug("Node reward creation...attempt #{count}", - transaction_address: Base.encode16(next_address) - ) - - send_node_rewards(next_index) - end - end) - - Process.monitor(pid) + |> Map.update!(:index, &(&1 + 1)) + |> Map.put(:next_address, Reward.next_address(index + 1)) - {:keep_state, Map.put(data, :watcher, pid)} - end + send_node_rewards(index + 1) + {:keep_state, new_data} end def handle_event( diff --git a/lib/archethic/reward/supervisor.ex b/lib/archethic/reward/supervisor.ex index 680552b42..bad9c7c6d 100644 --- a/lib/archethic/reward/supervisor.ex +++ b/lib/archethic/reward/supervisor.ex @@ -8,6 +8,7 @@ defmodule Archethic.Reward.Supervisor do alias Archethic.Reward.MemTablesLoader alias Archethic.Reward.MemTables.RewardTokens + @spec start_link(any) :: :ignore | {:error, any} | {:ok, pid} def start_link(args \\ []) do Supervisor.start_link(__MODULE__, args, name: Archethic.RewardSupervisor) end diff --git a/lib/archethic/shared_secrets/node_renewal_scheduler.ex b/lib/archethic/shared_secrets/node_renewal_scheduler.ex index 40d71672f..1f6d0bbb4 100644 --- a/lib/archethic/shared_secrets/node_renewal_scheduler.ex +++ b/lib/archethic/shared_secrets/node_renewal_scheduler.ex @@ -44,11 +44,28 @@ defmodule Archethic.SharedSecrets.NodeRenewalScheduler do end @doc false - def init(opts) do - interval = Keyword.get(opts, :interval) - PubSub.register_to_node_update() + def init(args) do + interval = Keyword.get(args, :interval) + state_data = Map.put(%{}, :interval, interval) + + case :persistent_term.get(:archethic_up, nil) do + nil -> + Logger.info("Node Renewal Scheduler: Waiting for node to complete Bootstrap. ") - Logger.info("Starting Node Renewal Scheduler") + PubSub.register_to_node_up() + {:ok, :idle, state_data} + + # wait for node ups + :up -> + {state, new_state_data, events} = start_scheduler(state_data) + {:ok, state, new_state_data, events} + end + end + + def start_scheduler(state_data) do + Logger.info("Node Renewal Scheduler: Starting... ") + + PubSub.register_to_node_update() case Crypto.first_node_public_key() |> P2P.get_node_info() |> elem(1) do %Node{authorized?: true, available?: true} -> @@ -56,14 +73,14 @@ defmodule Archethic.SharedSecrets.NodeRenewalScheduler do Logger.info("Node Renewal Scheduler: Scheduled during init") key_index = Crypto.number_of_node_shared_secrets_keys() + new_state_data = state_data |> Map.put(:index, key_index) - {:ok, :idle, %{interval: interval, index: key_index}, - [{:next_event, :internal, :schedule}]} + {:idle, new_state_data, [{:next_event, :internal, :schedule}]} _ -> Logger.info("Node Renewal Scheduler: Scheduler waiting for Node Update Message") - {:ok, :idle, %{interval: interval}} + {:idle, state_data, []} end end @@ -90,6 +107,13 @@ defmodule Archethic.SharedSecrets.NodeRenewalScheduler do {:next_state, :scheduled, new_data} end + def handle_event(:info, :node_up, :idle, state_data) do + PubSub.unregister_to_node_up() + # Node is Up start Scheduler + {:idle, new_state_data, events} = start_scheduler(state_data) + {:keep_state, new_state_data, events} + end + def handle_event( :info, {:node_update, diff --git a/mix.exs b/mix.exs index c28a376bd..7cbaa9bd5 100644 --- a/mix.exs +++ b/mix.exs @@ -114,7 +114,7 @@ defmodule Archethic.MixProject do # run single node "dev.run": ["deps.get", "cmd mix dev.clean", "cmd iex -S mix"], # Must be run before git push --no-verify | any(dialyzer issue) - "dev.checks": ["clean", "format", "compile", "credo", "cmd mix test", "dialyzer"], + "dev.checks": ["clean", "format", "compile", "credo", "cmd mix test --trace", "dialyzer"], # paralele checks "dev.pchecks": [" clean & format & compile & credo & test & dialyzer"], # docker test-net with 3 nodes diff --git a/src/c/nat/miniupnp b/src/c/nat/miniupnp index fa190f294..68c8ec508 160000 --- a/src/c/nat/miniupnp +++ b/src/c/nat/miniupnp @@ -1 +1 @@ -Subproject commit fa190f294a6a4799487f95e23c93646bcabeffef +Subproject commit 68c8ec508a421f4f4af67a63e3eb6f497d2531e1 diff --git a/test/archethic/beacon_chain/slot_timer_test.exs b/test/archethic/beacon_chain/slot_timer_test.exs index 594948411..c39290cf2 100644 --- a/test/archethic/beacon_chain/slot_timer_test.exs +++ b/test/archethic/beacon_chain/slot_timer_test.exs @@ -19,7 +19,13 @@ defmodule Archethic.BeaconChain.SlotTimerTest do end test "receive create_slot message after timer elapsed" do - SlotTimer.start_link([interval: "*/1 * * * * * *"], []) + :persistent_term.put(:archethic_up, nil) + + {:ok, pid} = SlotTimer.start_link([interval: "*/1 * * * * * *"], []) + assert %{interval: "*/1 * * * * * *"} = :sys.get_state(pid) + send(pid, :node_up) + assert %{interval: "*/1 * * * * * *", timer: _} = :sys.get_state(pid) + current = DateTime.utc_now() P2P.add_and_connect_node(%Node{ @@ -37,7 +43,12 @@ defmodule Archethic.BeaconChain.SlotTimerTest do end test "should not send create slot event if node is unavailable" do - SlotTimer.start_link([interval: "*/1 * * * * * *"], []) + :persistent_term.put(:archethic_up, nil) + + {:ok, pid} = SlotTimer.start_link([interval: "*/1 * * * * * *"], []) + assert %{interval: "*/1 * * * * * *"} = :sys.get_state(pid) + send(pid, :node_up) + assert %{interval: "*/1 * * * * * *", timer: _} = :sys.get_state(pid) P2P.add_and_connect_node(%Node{ first_public_key: Crypto.first_node_public_key(), @@ -51,7 +62,12 @@ defmodule Archethic.BeaconChain.SlotTimerTest do end test "handle_info/3 receive a slot creation message" do - {:ok, pid} = SlotTimer.start_link([interval: "0 * * * * * *"], []) + :persistent_term.put(:archethic_up, nil) + + {:ok, pid} = SlotTimer.start_link([interval: "*/1 * * * * * *"], []) + assert %{interval: "*/1 * * * * * *"} = :sys.get_state(pid) + send(pid, :node_up) + assert %{interval: "*/1 * * * * * *", timer: _} = :sys.get_state(pid) P2P.add_and_connect_node(%Node{ first_public_key: Crypto.first_node_public_key(), @@ -76,17 +92,52 @@ defmodule Archethic.BeaconChain.SlotTimerTest do end test "next_slot/1 should get the slot time from a given date" do - {:ok, _pid} = SlotTimer.start_link([interval: "0 * * * * * *", trigger_offset: 0], []) + :persistent_term.put(:archethic_up, nil) + + {:ok, pid} = SlotTimer.start_link([interval: "*/1 * * * * * *"], []) + assert %{interval: "*/1 * * * * * *"} = :sys.get_state(pid) + send(pid, :node_up) + assert %{interval: "*/1 * * * * * *", timer: _} = :sys.get_state(pid) + now = DateTime.utc_now() next_slot_time = SlotTimer.next_slot(now) assert :gt == DateTime.compare(next_slot_time, now) end test "previous_slot/2 should retrieve the previous slot time from a date" do - {:ok, _pid} = SlotTimer.start_link([interval: "0 * * * * * *"], []) + :persistent_term.put(:archethic_up, nil) + + {:ok, pid} = SlotTimer.start_link([interval: "*/1 * * * * * *"], []) + assert %{interval: "*/1 * * * * * *"} = :sys.get_state(pid) + send(pid, :node_up) + assert %{interval: "*/1 * * * * * *", timer: _} = :sys.get_state(pid) now = DateTime.utc_now() previous_slot_time = SlotTimer.previous_slot(now) assert :gt == DateTime.compare(now, previous_slot_time) end + + describe "SlotTimer Behavior During start" do + test "should wait for node :up message" do + :persistent_term.put(:archethic_up, nil) + + {:ok, pid} = SlotTimer.start_link([interval: "*/1 * * * * * *"], []) + assert %{interval: "*/1 * * * * * *"} = :sys.get_state(pid) + end + + test "should start timer post node :up message" do + :persistent_term.put(:archethic_up, nil) + + {:ok, pid} = SlotTimer.start_link([interval: "*/1 * * * * * *"], []) + assert %{interval: "*/1 * * * * * *"} = :sys.get_state(pid) + send(pid, :node_up) + assert %{interval: "*/1 * * * * * *", timer: _} = :sys.get_state(pid) + end + + test "should use :persistent_term archethic_up when slot timer crashes" do + :persistent_term.put(:archethic_up, :up) + {:ok, pid} = SlotTimer.start_link([interval: "*/1 * * * * * *"], []) + assert %{interval: "*/1 * * * * * *", timer: _} = :sys.get_state(pid) + end + end end diff --git a/test/archethic/beacon_chain/subset_test.exs b/test/archethic/beacon_chain/subset_test.exs index 6cbc0730b..7e8356af3 100644 --- a/test/archethic/beacon_chain/subset_test.exs +++ b/test/archethic/beacon_chain/subset_test.exs @@ -42,6 +42,12 @@ defmodule Archethic.BeaconChain.SubsetTest do :ok = Subset.add_end_of_node_sync(subset, %EndOfNodeSync{public_key: public_key}) + MockClient + |> stub(:send_message, fn + _, %NewBeaconTransaction{}, _ -> + {:ok, %Ok{}} + end) + assert %{ current_slot: %Slot{ end_of_node_synchronizations: [%EndOfNodeSync{public_key: ^public_key}] @@ -53,8 +59,12 @@ defmodule Archethic.BeaconChain.SubsetTest do test "new transaction summary is added to the slot and include the storage node confirmation", %{subset: subset} do MockClient - |> stub(:send_message, fn _, _txn = %TransactionSummary{}, _ -> - :ok + |> stub(:send_message, fn + _, _txn = %TransactionSummary{}, _ -> + {:ok, %Ok{}} + + _, %NewBeaconTransaction{}, _ -> + {:ok, %Ok{}} end) start_supervised!({SummaryTimer, interval: "0 0 * * *"}) @@ -100,8 +110,8 @@ defmodule Archethic.BeaconChain.SubsetTest do pid = start_supervised!({Subset, subset: subset}) MockClient - |> stub(:send_message, fn _, _txn = %NewBeaconTransaction{}, _ -> - :ok + |> stub(:send_message, fn _, %NewBeaconTransaction{}, _ -> + {:ok, %Ok{}} end) tx_time = DateTime.utc_now() diff --git a/test/archethic/mining/pending_transaction_validation_test.exs b/test/archethic/mining/pending_transaction_validation_test.exs index 11ed19a6e..8a8c56639 100644 --- a/test/archethic/mining/pending_transaction_validation_test.exs +++ b/test/archethic/mining/pending_transaction_validation_test.exs @@ -388,7 +388,13 @@ defmodule Archethic.Mining.PendingTransactionValidationTest do NetworkLookup.set_network_pool_address(address) - Scheduler.start_link(interval: "0 * * * * *") + {:ok, pid} = Scheduler.start_link(interval: "0 * * * * *") + + assert {:idle, %{interval: "0 * * * * *"}} = :sys.get_state(pid) + + send(pid, :node_up) + + assert {:idle, %{interval: "0 * * * * *"}} = :sys.get_state(pid) MockDB |> stub(:get_latest_burned_fees, fn -> 300_000_000 end) @@ -423,7 +429,13 @@ defmodule Archethic.Mining.PendingTransactionValidationTest do NetworkLookup.set_network_pool_address(address) - Scheduler.start_link(interval: "0 * * * * *") + {:ok, pid} = Scheduler.start_link(interval: "0 * * * * *") + + assert {:idle, %{interval: "0 * * * * *"}} = :sys.get_state(pid) + + send(pid, :node_up) + + assert {:idle, %{interval: "0 * * * * *"}} = :sys.get_state(pid) MockDB |> stub(:get_latest_burned_fees, fn -> 200_000_000 end) @@ -457,7 +469,13 @@ defmodule Archethic.Mining.PendingTransactionValidationTest do NetworkLookup.set_network_pool_address(:crypto.strong_rand_bytes(32)) - Scheduler.start_link(interval: "0 * * * * *") + {:ok, pid} = Scheduler.start_link(interval: "0 * * * * *") + + assert {:idle, %{interval: "0 * * * * *"}} = :sys.get_state(pid) + + send(pid, :node_up) + + assert {:idle, %{interval: "0 * * * * *"}} = :sys.get_state(pid) MockDB |> stub(:get_latest_burned_fees, fn -> 300_000_000 end) @@ -721,6 +739,7 @@ defmodule Archethic.Mining.PendingTransactionValidationTest do end test "should return :ok when a mint reward transaction passes all tests" do + :persistent_term.put(:archethic_up, :up) tx_seed = :crypto.strong_rand_bytes(32) {pub, _} = Crypto.derive_keypair(tx_seed, 1) address = Crypto.derive_address(pub) @@ -753,6 +772,8 @@ defmodule Archethic.Mining.PendingTransactionValidationTest do :persistent_term.put(:reward_gen_addr, Transaction.previous_address(tx)) assert :ok = PendingTransactionValidation.validate(tx) :persistent_term.put(:reward_gen_addr, nil) + + :persistent_term.put(:archethic_up, nil) end test "should return :error when a mint reward transaction has != burned_fees" do @@ -793,9 +814,18 @@ defmodule Archethic.Mining.PendingTransactionValidationTest do {pub, _} = Crypto.derive_keypair(tx_seed, 1) address = Crypto.derive_address(pub) + MockCrypto + |> stub(:first_public_key, fn -> + {pub, _} = + Crypto.derive_keypair("seed_for_mining_transaction_valdiation_test", 0, :secp256r1) + + pub + end) + NetworkLookup.set_network_pool_address(:crypto.strong_rand_bytes(32)) - Scheduler.start_link(interval: "0 * * * * *") + {:ok, pid} = Scheduler.start_link(interval: "0 * * * * *") + send(pid, :node_up) MockDB |> stub(:get_latest_burned_fees, fn -> 300_000_000 end) diff --git a/test/archethic/oracle_chain/scheduler_test.exs b/test/archethic/oracle_chain/scheduler_test.exs index ca4585cd2..65e135b56 100644 --- a/test/archethic/oracle_chain/scheduler_test.exs +++ b/test/archethic/oracle_chain/scheduler_test.exs @@ -25,7 +25,7 @@ defmodule Archethic.OracleChain.SchedulerTest do :ok end - describe "when receives a poll message" do + describe "Oracle Scheduler: when receives a poll message" do setup do me = self() @@ -80,7 +80,14 @@ defmodule Archethic.OracleChain.SchedulerTest do {:ok, pid} = Scheduler.start_link([polling_interval: "0 * * * *", summary_interval: "0 0 0 * *"], []) - assert {:idle, _} = :sys.get_state(pid) + assert {:idle, %{polling_interval: "0 * * * *", summary_interval: "0 0 0 * *"}} = + :sys.get_state(pid) + + send(pid, :node_up) + + assert {:idle, + %{indexes: %{}, polling_interval: "0 * * * *", summary_interval: "0 0 0 * *"}} = + :sys.get_state(pid) P2P.add_and_connect_node(%Node{ ip: {127, 0, 0, 1}, @@ -131,6 +138,15 @@ defmodule Archethic.OracleChain.SchedulerTest do {:ok, pid} = Scheduler.start_link([polling_interval: "0 * * * *", summary_interval: "0 0 0 * *"], []) + assert {:idle, %{polling_interval: "0 * * * *", summary_interval: "0 0 0 * *"}} = + :sys.get_state(pid) + + send(pid, :node_up) + + assert {:idle, + %{indexes: %{}, polling_interval: "0 * * * *", summary_interval: "0 0 0 * *"}} = + :sys.get_state(pid) + P2P.add_and_connect_node(%Node{ ip: {127, 0, 0, 1}, port: 3002, @@ -170,6 +186,15 @@ defmodule Archethic.OracleChain.SchedulerTest do {:ok, pid} = Scheduler.start_link([polling_interval: "0 0 0 * *", summary_interval: "0 0 0 * *"], []) + assert {:idle, %{polling_interval: "0 0 0 * *", summary_interval: "0 0 0 * *"}} = + :sys.get_state(pid) + + send(pid, :node_up) + + assert {:idle, + %{indexes: %{}, polling_interval: "0 0 0 * *", summary_interval: "0 0 0 * *"}} = + :sys.get_state(pid) + P2P.add_and_connect_node(%Node{ ip: {127, 0, 0, 1}, port: 3002, @@ -260,7 +285,14 @@ defmodule Archethic.OracleChain.SchedulerTest do {:ok, pid} = Scheduler.start_link([polling_interval: "0 * * * * *", summary_interval: "0 0 0 * *"], []) - assert {:idle, _} = :sys.get_state(pid) + assert {:idle, %{polling_interval: "0 * * * * *", summary_interval: "0 0 0 * *"}} = + :sys.get_state(pid) + + send(pid, :node_up) + + assert {:idle, + %{indexes: %{}, polling_interval: "0 * * * * *", summary_interval: "0 0 0 * *"}} = + :sys.get_state(pid) P2P.add_and_connect_node(%Node{ ip: {127, 0, 0, 1}, @@ -288,4 +320,108 @@ defmodule Archethic.OracleChain.SchedulerTest do assert timer2 != timer1 end end + + describe "Scheduler Behavior During start" do + test "should be idle when node has not done Bootstrapping" do + :persistent_term.put(:archethic_up, nil) + + {:ok, pid} = + Scheduler.start_link([polling_interval: "0 * * * *", summary_interval: "0 0 0 * *"], []) + + assert {:idle, %{polling_interval: "0 * * * *", summary_interval: "0 0 0 * *"}} = + :sys.get_state(pid) + end + + test "should wait for node up message to start the scheduler, node: not authorized and available" do + :persistent_term.put(:archethic_up, nil) + + {:ok, pid} = + Scheduler.start_link([polling_interval: "0 */2 * * *", summary_interval: "0 0 0 * *"], []) + + assert {:idle, %{polling_interval: "0 */2 * * *", summary_interval: "0 0 0 * *"}} = + :sys.get_state(pid) + + send(pid, :node_up) + + assert {:idle, + %{ + indexes: %{}, + polling_interval: "0 */2 * * *", + summary_interval: "0 0 0 * *" + }} = :sys.get_state(pid) + end + + test "should wait for node up message to start the scheduler, node: authorized and available" do + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 1}, + port: 3002, + first_public_key: Crypto.first_node_public_key(), + last_public_key: Crypto.first_node_public_key(), + authorized?: true, + authorization_date: DateTime.utc_now(), + geo_patch: "AAA", + available?: true + }) + + {:ok, pid} = + Scheduler.start_link([polling_interval: "0 */3 * * *", summary_interval: "0 0 0 * *"], []) + + assert {:idle, %{polling_interval: "0 */3 * * *", summary_interval: "0 0 0 * *"}} = + :sys.get_state(pid) + + send(pid, :node_up) + + assert {:scheduled, + %{ + indexes: _, + polling_interval: "0 */3 * * *", + summary_interval: "0 0 0 * *", + summary_date: _date_time = %DateTime{} + }} = :sys.get_state(pid) + end + + test "Should use persistent_term :archethic_up when a Scheduler crashes,current node: Not authorized and available" do + :persistent_term.put(:archethic_up, :up) + + {:ok, pid} = + Scheduler.start_link([polling_interval: "0 */4 * * *", summary_interval: "0 0 0 * *"], []) + + assert {:idle, + %{ + polling_interval: "0 */4 * * *", + summary_interval: "0 0 0 * *", + indexes: %{} + }} = :sys.get_state(pid) + + :persistent_term.put(:archethic_up, nil) + end + + test "Should use persistent_term :archethic_up when a Scheduler crashes,current node: authorized and available" do + :persistent_term.put(:archethic_up, :up) + + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 1}, + port: 3002, + first_public_key: Crypto.first_node_public_key(), + last_public_key: Crypto.first_node_public_key(), + authorized?: true, + authorization_date: DateTime.utc_now(), + geo_patch: "AAA", + available?: true + }) + + {:ok, pid} = + Scheduler.start_link([polling_interval: "0 */6 * * *", summary_interval: "0 0 0 * *"], []) + + assert {:scheduled, + %{ + polling_interval: "0 */6 * * *", + summary_interval: "0 0 0 * *", + summary_date: _date_time = %DateTime{}, + indexes: _ + }} = :sys.get_state(pid) + + :persistent_term.put(:archethic_up, nil) + end + end end diff --git a/test/archethic/reward/scheduler_test.exs b/test/archethic/reward/scheduler_test.exs index 11492dc56..a6ae56b7f 100644 --- a/test/archethic/reward/scheduler_test.exs +++ b/test/archethic/reward/scheduler_test.exs @@ -17,7 +17,11 @@ defmodule Archethic.Reward.SchedulerTest do MockDB |> stub(:get_latest_burned_fees, fn -> 0 end) - assert {:ok, pid} = Scheduler.start_link(interval: "*/1 * * * * *") + {:ok, pid} = Scheduler.start_link(interval: "*/1 * * * * *") + + assert {:idle, %{interval: "*/1 * * * * *"}} = :sys.get_state(pid) + + send(pid, :node_up) assert {:idle, %{interval: "*/1 * * * * *"}} = :sys.get_state(pid) @@ -58,6 +62,7 @@ defmodule Archethic.Reward.SchedulerTest do me = self() assert {:ok, pid} = Scheduler.start_link(interval: "*/1 * * * * *") + send(pid, :node_up) MockClient |> stub(:send_message, fn @@ -84,10 +89,97 @@ defmodule Archethic.Reward.SchedulerTest do send(me, type) end) - assert {:ok, _} = Scheduler.start_link(interval: "*/1 * * * * *") + assert {:ok, pid} = Scheduler.start_link(interval: "*/1 * * * * *") + send(pid, :node_up) refute_receive :mint_rewards, 1_200 assert_receive :node_rewards, 1_500 end end + + describe "Scheduler Behavior During Start" do + test "should be idle(state with args) when node has not done Bootstrapping" do + :persistent_term.put(:archethic_up, nil) + + {:ok, pid} = Scheduler.start_link(interval: "*/1 * * * * *") + + assert {:idle, %{interval: "*/1 * * * * *"}} = :sys.get_state(pid) + end + + test "should wait for node :up message to start the scheduler, when node is not authorized and available" do + :persistent_term.put(:archethic_up, nil) + + {:ok, pid} = Scheduler.start_link(interval: "*/2 * * * * *") + + assert {:idle, %{interval: "*/2 * * * * *"}} = :sys.get_state(pid) + + send(pid, :node_up) + + assert {:idle, %{interval: "*/2 * * * * *"}} = :sys.get_state(pid) + end + + test "should wait for node :up message to start the scheduler, when node is authorized and available" do + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 1}, + port: 3002, + first_public_key: Crypto.first_node_public_key(), + last_public_key: Crypto.first_node_public_key(), + authorized?: true, + authorization_date: DateTime.utc_now(), + geo_patch: "AAA", + available?: true + }) + + {:ok, pid} = Scheduler.start_link(interval: "*/3 * * * * *") + + assert {:idle, %{interval: "*/3 * * * * *"}} = :sys.get_state(pid) + send(pid, :node_up) + + assert {:scheduled, + %{ + interval: "*/3 * * * * *", + index: _, + next_address: _ + }} = :sys.get_state(pid) + end + + test "Should use persistent_term :archethic_up when a Scheduler crashes, when a node is not authorized and available" do + :persistent_term.put(:archethic_up, :up) + + {:ok, pid} = Scheduler.start_link(interval: "*/4 * * * * *") + + assert {:idle, + %{ + interval: "*/4 * * * * *" + }} = :sys.get_state(pid) + + :persistent_term.put(:archethic_up, nil) + end + + test "Should use persistent_term :archethic_up when a Scheduler crashes, when a node is authorized and available" do + :persistent_term.put(:archethic_up, :up) + + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 1}, + port: 3002, + first_public_key: Crypto.first_node_public_key(), + last_public_key: Crypto.first_node_public_key(), + authorized?: true, + authorization_date: DateTime.utc_now(), + geo_patch: "AAA", + available?: true + }) + + {:ok, pid} = Scheduler.start_link(interval: "*/5 * * * * *") + + assert {:scheduled, + %{ + interval: "*/5 * * * * *", + index: _, + next_address: _ + }} = :sys.get_state(pid) + + :persistent_term.put(:archethic_up, nil) + end + end end diff --git a/test/archethic/shared_secrets/node_renewal_scheduler_test.exs b/test/archethic/shared_secrets/node_renewal_scheduler_test.exs index a656fb04c..2ccdb98ad 100644 --- a/test/archethic/shared_secrets/node_renewal_scheduler_test.exs +++ b/test/archethic/shared_secrets/node_renewal_scheduler_test.exs @@ -28,6 +28,8 @@ defmodule Archethic.SharedSecrets.NodeRenewalSchedulerTest do end test "should initiate the node renewal scheduler and trigger node renewal every each seconds" do + :persistent_term.put(:archethic_up, :up) + P2P.add_and_connect_node(%Node{ first_public_key: Crypto.last_node_public_key(), last_public_key: Crypto.last_node_public_key(), @@ -64,9 +66,12 @@ defmodule Archethic.SharedSecrets.NodeRenewalSchedulerTest do ) assert_receive :renewal_processed, 3_000 + :persistent_term.put(:archethic_up, nil) end test "should retrigger the scheduling after tx replication" do + :persistent_term.put(:archethic_up, :up) + P2P.add_and_connect_node(%Node{ first_public_key: Crypto.last_node_public_key(), last_public_key: Crypto.last_node_public_key(), @@ -101,5 +106,94 @@ defmodule Archethic.SharedSecrets.NodeRenewalSchedulerTest do assert {:scheduled, %{timer: timer2}} = :sys.get_state(pid) assert timer2 != timer1 + :persistent_term.put(:archethic_up, nil) + end + + describe "Scheduler Behavior During start" do + test "should be idle when node has not done Bootstrapping" do + :persistent_term.put(:archethic_up, nil) + + assert {:ok, pid} = Scheduler.start_link([interval: "*/2 * * * * *"], []) + + assert {:idle, %{interval: "*/2 * * * * *"}} = :sys.get_state(pid) + end + + test "should wait for node up message to start the scheduler, node: not authorized and available" do + :persistent_term.put(:archethic_up, nil) + + assert {:ok, pid} = Scheduler.start_link([interval: "*/3 * * * * *"], []) + + assert {:idle, %{interval: "*/3 * * * * *"}} = :sys.get_state(pid) + + send(pid, :node_up) + + assert {:idle, + %{ + interval: "*/3 * * * * *" + }} = :sys.get_state(pid) + end + + test "should wait for node up message to start the scheduler, node: authorized and available" do + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 1}, + port: 3002, + first_public_key: Crypto.first_node_public_key(), + last_public_key: Crypto.first_node_public_key(), + authorized?: true, + authorization_date: DateTime.utc_now(), + geo_patch: "AAA", + available?: true + }) + + assert {:ok, pid} = Scheduler.start_link([interval: "*/4 * * * * *"], []) + + assert {:idle, %{interval: "*/4 * * * * *"}} = :sys.get_state(pid) + + send(pid, :node_up) + + assert {:scheduled, + %{ + interval: "*/4 * * * * *", + index: _ + }} = :sys.get_state(pid) + end + + test "Should use persistent_term :archethic_up when a Scheduler crashes,current node: Not authorized and available" do + :persistent_term.put(:archethic_up, :up) + + assert {:ok, pid} = Scheduler.start_link([interval: "*/5 * * * * *"], []) + + assert {:idle, + %{ + interval: "*/5 * * * * *" + }} = :sys.get_state(pid) + + :persistent_term.put(:archethic_up, nil) + end + + test "Should use persistent_term :archethic_up when a Scheduler crashes, current node: authorized and available" do + :persistent_term.put(:archethic_up, :up) + + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 1}, + port: 3002, + first_public_key: Crypto.first_node_public_key(), + last_public_key: Crypto.first_node_public_key(), + authorized?: true, + authorization_date: DateTime.utc_now(), + geo_patch: "AAA", + available?: true + }) + + assert {:ok, pid} = Scheduler.start_link([interval: "*/6 * * * * *"], []) + + assert {:scheduled, + %{ + interval: "*/6 * * * * *", + index: _ + }} = :sys.get_state(pid) + + :persistent_term.put(:archethic_up, nil) + end end end diff --git a/test/archethic/utils/detect_node_responsiveness_test.exs b/test/archethic/utils/detect_node_responsiveness_test.exs index 6a0499ff7..6e0c58244 100644 --- a/test/archethic/utils/detect_node_responsiveness_test.exs +++ b/test/archethic/utils/detect_node_responsiveness_test.exs @@ -1,5 +1,5 @@ defmodule Archethic.Utils.DetectNodeResponsivenessTest do - use ArchethicCase + use ArchethicCase, async: false @timeout 200 @sleep_timeout 350