diff --git a/lib/archethic/reward/scheduler.ex b/lib/archethic/reward/scheduler.ex index 2186a22c7..be71ed00c 100644 --- a/lib/archethic/reward/scheduler.ex +++ b/lib/archethic/reward/scheduler.ex @@ -1,7 +1,7 @@ defmodule Archethic.Reward.Scheduler do @moduledoc false - use GenServer + use GenStateMachine, callback_mode: [:handle_event_function] alias Crontab.CronExpression.Parser, as: CronParser alias Crontab.Scheduler, as: CronScheduler @@ -24,7 +24,7 @@ defmodule Archethic.Reward.Scheduler do require Logger def start_link(args \\ []) do - GenServer.start_link(__MODULE__, args, name: __MODULE__) + GenStateMachine.start_link(__MODULE__, args, name: __MODULE__) end @doc """ @@ -32,7 +32,7 @@ defmodule Archethic.Reward.Scheduler do """ @spec last_date() :: DateTime.t() def last_date do - GenServer.call(__MODULE__, :last_date) + GenStateMachine.call(__MODULE__, :last_date) end def init(args) do @@ -44,92 +44,168 @@ defmodule Archethic.Reward.Scheduler do %Node{authorized?: true, available?: true} -> Logger.info("Reward Scheduler scheduled during init") - {:ok, %{interval: interval, timer: schedule(interval)}, :hibernate} + {:ok, :idle, %{interval: interval}, {:next_event, :internal, :schedule}} _ -> Logger.info("Reward Scheduler waitng for Node Update Message") - {:ok, %{interval: interval}, :hibernate} + {:ok, :idle, %{interval: interval}} end - {:ok, %{interval: interval}, :hibernate} + {:ok, :idle, %{interval: interval}} end - def handle_info( + def handle_event( + :info, {:node_update, %Node{authorized?: true, available?: true, first_public_key: first_public_key}}, - state = %{interval: interval} + :idle, + _data ) do if Crypto.first_node_public_key() == first_public_key do - case Map.get(state, :timer) do - nil -> - timer = schedule(interval) - Logger.info("Start the network pool reward scheduler") - {:noreply, Map.put(state, :timer, timer), :hibernate} - - _ -> - {:noreply, state} - end + Logger.info("Start the network pool reward scheduler") + {:keep_state_and_data, {:next_event, :internal, :schedule}} else - {:noreply, state} + :keep_state_and_data end end - def handle_info( + def handle_event( + :info, {:node_update, %Node{authorized?: false, first_public_key: first_public_key}}, - state = %{timer: timer} - ) do + state, + data + ) + when state != :idle do if Crypto.first_node_public_key() == first_public_key do - Process.cancel_timer(timer) - {:noreply, Map.delete(state, :timer)} + data + |> Map.get(:timer, make_ref()) + |> Process.cancel_timer() + + {:next_state, :idle, Map.delete(state, :timer)} else - {:noreply, state} + :keep_state_and_data end end - def handle_info( + def handle_event( + :info, {:node_update, %Node{available?: false, first_public_key: first_public_key}}, - state = %{timer: timer} + _state, + data ) do if Crypto.first_node_public_key() == first_public_key do - Process.cancel_timer(timer) - {:noreply, Map.delete(state, :timer)} + data + |> Map.get(:timer, make_ref()) + |> Process.cancel_timer() + + {:next_state, :idle, Map.delete(data, :timer)} else - {:noreply, state} + :keep_state_and_data end end - def handle_info({:node_update, _}, state), do: {:noreply, state} + def handle_event(:info, {:node_update, _}, _state, _data), do: :keep_state_and_data - def handle_info(:mint_rewards, state = %{interval: interval}) do - timer = schedule(interval) + def handle_event(:info, :mint_rewards, :scheduled, data) do + {:next_state, :triggered, data, {:next_event, :internal, :make_rewards}} + end + + def handle_event( + :info, + {:new_transaction, address, :mint_rewards, _timestamp}, + :triggered, + _data + ) do + PubSub.unregister_to_new_transaction_by_address(address) + + send_node_rewards() + :keep_state_and_data + end + + def handle_event( + :info, + {:new_transaction, address, :node_rewards, _timestamp}, + :triggered, + data + ) do + PubSub.unregister_to_new_transaction_by_address(address) + + case Map.get(data, :watcher) do + nil -> + :ignore + + {_address, pid} -> + Process.exit(pid, :normal) + end + + {:keep_state, data, {:next_event, :internal, :schedule}} + end + + def handle_event( + :info, + {:DOWN, _ref, :process, pid, _}, + :triggered, + data = %{watcher: {address, watcher_pid}} + ) + when watcher_pid == pid do + PubSub.unregister_to_new_transaction_by_address(address) + {:keep_state, Map.delete(data, :watcher), {:next_event, :internal, :schedule}} + end + def handle_event( + :info, + {:DOWN, _ref, :process, pid, _}, + :scheduled, + data = %{watcher: {address, watcher_pid}} + ) + when pid == watcher_pid do + PubSub.unregister_to_new_transaction_by_address(address) + {:keep_state, Map.delete(data, :watcher)} + end + + def handle_event(:internal, :make_rewards, :triggered, data) do tx_address = Reward.next_address() if Reward.initiator?(tx_address) do mint_node_rewards() + :keep_state_and_data else - DetectNodeResponsiveness.start_link(tx_address, fn count -> - if Reward.initiator?(tx_address, count) do - Logger.debug("Mint reward creation...attempt #{count}", - transaction_address: Base.encode16(tx_address) - ) - - mint_node_rewards() - end - end) + {:ok, pid} = + DetectNodeResponsiveness.start_link(tx_address, fn count -> + if Reward.initiator?(tx_address, count) do + Logger.debug("Mint reward creation...attempt #{count}", + transaction_address: Base.encode16(tx_address) + ) + + mint_node_rewards() + end + end) + + Process.monitor(pid) + + {:keep_state, Map.put(data, :watcher, {tx_address, pid})} end + end - {:noreply, Map.put(state, :timer, timer), :hibernate} + def handle_event(:internal, :schedule, _state, data = %{interval: interval}) do + timer = schedule(interval) + new_data = Map.put(data, :timer, timer) + {:next_state, :scheduled, new_data} end - def handle_info({:new_transaction, _address, :mint_rewards, _timestamp}, state) do - send_node_rewards() - {:noreply, state, :hibernate} + def handle_event({:call, from}, :last_date, _state, _data = %{interval: interval}) do + {:keep_state_and_data, {:reply, from, get_last_date(interval)}} end - def handle_info({:new_transaction, _address, _type, _timestamp}, state) do - {:noreply, state, :hibernate} + def handle_event(:cast, {:new_conf, conf}, _, data) do + case Keyword.get(conf, :interval) do + nil -> + :keep_state_and_data + + new_interval -> + {:noreply, Map.put(data, :interval, new_interval)} + end end defp mint_node_rewards do @@ -152,22 +228,11 @@ defmodule Archethic.Reward.Scheduler do end defp send_node_rewards do - Reward.new_node_rewards() - |> Archethic.send_new_transaction() - end - - def handle_call(:last_date, _, state = %{interval: interval}) do - {:reply, get_last_date(interval), state} - end + node_reward_tx = Reward.new_node_rewards() - def handle_cast({:new_conf, conf}, state) do - case Keyword.get(conf, :interval) do - nil -> - {:noreply, state} + PubSub.register_to_new_transaction_by_address(node_reward_tx.address) - new_interval -> - {:noreply, Map.put(state, :interval, new_interval)} - end + Archethic.send_new_transaction(node_reward_tx) end defp get_last_date(interval) do @@ -194,6 +259,6 @@ defmodule Archethic.Reward.Scheduler do def config_change(nil), do: :ok def config_change(conf) do - GenServer.cast(__MODULE__, {:new_conf, conf}) + GenStateMachine.cast(__MODULE__, {:new_conf, conf}) end end diff --git a/test/archethic/reward/scheduler_test.exs b/test/archethic/reward/scheduler_test.exs index 1dd3a9a4a..f2b1cf1e4 100644 --- a/test/archethic/reward/scheduler_test.exs +++ b/test/archethic/reward/scheduler_test.exs @@ -9,6 +9,8 @@ defmodule Archethic.Reward.SchedulerTest do alias Archethic.Reward.Scheduler + alias Archethic.TransactionChain.Transaction + import Mox setup do @@ -29,7 +31,7 @@ defmodule Archethic.Reward.SchedulerTest do assert {:ok, pid} = Scheduler.start_link(interval: "*/1 * * * * *") - assert %{interval: "*/1 * * * * *"} = :sys.get_state(pid) + assert {:idle, %{interval: "*/1 * * * * *"}} = :sys.get_state(pid) send( pid, @@ -41,6 +43,8 @@ defmodule Archethic.Reward.SchedulerTest do }} ) + assert {:scheduled, %{timer: _}} = :sys.get_state(pid) + :erlang.trace(pid, true, [:receive]) assert_receive {:trace, ^pid, :receive, :mint_rewards}, 3_000 @@ -56,12 +60,12 @@ defmodule Archethic.Reward.SchedulerTest do MockClient |> stub(:send_message, fn - _, %StartMining{transaction: %{type: type}}, _ when type == :mint_rewards -> - send(pid, {:new_transaction, nil, :mint_rewards, nil}) - send(me, type) + _, %StartMining{transaction: %Transaction{address: address, type: :mint_rewards}}, _ -> + send(pid, {:new_transaction, address, :mint_rewards, DateTime.utc_now()}) + send(me, :mint_rewards) - _, %StartMining{transaction: %{type: type}}, _ when type == :node_rewards -> - send(me, type) + _, %StartMining{transaction: %{type: :node_rewards}}, _ -> + send(me, :node_rewards) end) send(