From 2c3250a95bcbaaecdaeb8c32db1a346a4dc3e4c0 Mon Sep 17 00:00:00 2001 From: Bastien CHAMAGNE Date: Fri, 5 May 2023 09:46:58 +0200 Subject: [PATCH] Do not schedule an interval tick if there is a datetime tick at the same time --- lib/archethic/contracts/worker.ex | 52 +++++++++++++++++++++++-------- 1 file changed, 39 insertions(+), 13 deletions(-) diff --git a/lib/archethic/contracts/worker.ex b/lib/archethic/contracts/worker.ex index 5f42f1eff..a804966c1 100644 --- a/lib/archethic/contracts/worker.ex +++ b/lib/archethic/contracts/worker.ex @@ -31,6 +31,8 @@ defmodule Archethic.Contracts.Worker do alias Archethic.Utils alias Archethic.Utils.DetectNodeResponsiveness + alias Crontab.CronExpression.Parser, as: CronParser + @extended_mode? Mix.env() != :prod require Logger @@ -60,9 +62,11 @@ defmodule Archethic.Contracts.Worker do end def handle_continue(:start_schedulers, state = %{contract: %Contract{triggers: triggers}}) do + triggers_type = Map.keys(triggers) + new_state = - Enum.reduce(triggers, state, fn {trigger_type, _}, acc -> - case schedule_trigger(trigger_type) do + Enum.reduce(triggers_type, state, fn trigger_type, acc -> + case schedule_trigger(trigger_type, triggers_type) do timer when is_reference(timer) -> Map.update(acc, :timers, %{trigger_type => timer}, &Map.put(&1, trigger_type, timer)) @@ -82,7 +86,7 @@ defmodule Archethic.Contracts.Worker do contract_tx = Constants.to_transaction(contract.constants.contract) meta = log_metadata(contract_tx, trigger_tx) - Logger.debug("Contract execution started", meta) + Logger.debug("Contract execution started (trigger=transaction)", meta) with true <- enough_funds?(contract_tx.address), {:ok, calls} <- TransactionChain.fetch_contract_calls(contract_tx.address), @@ -118,7 +122,7 @@ defmodule Archethic.Contracts.Worker do contract_tx = Constants.to_transaction(contract.constants.contract) meta = log_metadata(contract_tx) - Logger.debug("Contract execution started", meta) + Logger.debug("Contract execution started (trigger=datetime)", meta) with true <- enough_funds?(contract_tx.address), {:ok, calls} <- TransactionChain.fetch_contract_calls(contract_tx.address), @@ -142,12 +146,12 @@ defmodule Archethic.Contracts.Worker do # TRIGGER: INTERVAL def handle_info( {:trigger, trigger_type = {:interval, interval}}, - state = %{contract: contract} + state = %{contract: contract = %Contract{triggers: triggers}} ) do contract_tx = Constants.to_transaction(contract.constants.contract) meta = log_metadata(contract_tx) - Logger.debug("Contract execution started", meta) + Logger.debug("Contract execution started (trigger=interval)", meta) with true <- enough_funds?(contract_tx.address), {:ok, calls} <- TransactionChain.fetch_contract_calls(contract_tx.address), @@ -165,7 +169,7 @@ defmodule Archethic.Contracts.Worker do Logger.debug("Contract execution failed", meta) end - interval_timer = schedule_trigger({:interval, interval}) + interval_timer = schedule_trigger({:interval, interval}, Map.keys(triggers)) {:noreply, put_in(state, [:timers, :interval], interval_timer)} end @@ -178,7 +182,7 @@ defmodule Archethic.Contracts.Worker do {:ok, oracle_tx} = TransactionChain.get_transaction(tx_address) meta = log_metadata(contract_tx, oracle_tx) - Logger.debug("Contract execution started", meta) + Logger.debug("Contract execution started (trigger=oracle)", meta) with true <- enough_funds?(contract_tx.address), {:ok, calls} <- TransactionChain.fetch_contract_calls(contract_tx.address), @@ -208,15 +212,37 @@ defmodule Archethic.Contracts.Worker do {:via, Registry, {ContractRegistry, address}} end - defp schedule_trigger(trigger = {:interval, interval}) do + defp schedule_trigger(trigger = {:interval, interval}, triggers_type) do + now = DateTime.utc_now() + + next_tick = + interval + |> CronParser.parse!(@extended_mode?) + |> Utils.next_date(now) + + # do not allow an interval trigger if there is a datetime trigger at same time + # because one of them would get a "transaction is already mining" + next_tick = + if {:datetime, next_tick} in triggers_type do + Logger.debug( + "Contract scheduler skips next tick for trigger=interval because there is a trigger=datetime at the same time that takes precedence" + ) + + interval + |> CronParser.parse!(@extended_mode?) + |> Utils.next_date(next_tick) + else + next_tick + end + Process.send_after( self(), {:trigger, trigger}, - Utils.time_offset(interval, DateTime.utc_now(), @extended_mode?) * 1000 + DateTime.diff(next_tick, now, :millisecond) ) end - defp schedule_trigger(trigger = {:datetime, datetime = %DateTime{}}) do + defp schedule_trigger(trigger = {:datetime, datetime = %DateTime{}}, _triggers_type) do seconds = DateTime.diff(datetime, DateTime.utc_now()) if seconds > 0 do @@ -224,11 +250,11 @@ defmodule Archethic.Contracts.Worker do end end - defp schedule_trigger(:oracle) do + defp schedule_trigger(:oracle, _triggers_type) do PubSub.register_to_new_transaction_by_type(:oracle) end - defp schedule_trigger(_), do: :ok + defp schedule_trigger(_trigger_type, _triggers_type), do: :ok defp handle_new_transaction(next_transaction = %Transaction{}) do validation_nodes = get_validation_nodes(next_transaction)