From 93a3f59c082d574237071d301f190cb71c715d05 Mon Sep 17 00:00:00 2001 From: Samuel Manzanera Date: Tue, 25 Oct 2022 14:06:50 +0200 Subject: [PATCH] Fix slot/summary timer prev/next function --- lib/archethic/beacon_chain/slot_timer.ex | 81 ++++++++++++++------- lib/archethic/beacon_chain/subset.ex | 25 ++++--- lib/archethic/beacon_chain/summary_timer.ex | 30 ++++++-- lib/archethic/self_repair/sync.ex | 3 +- 4 files changed, 92 insertions(+), 47 deletions(-) diff --git a/lib/archethic/beacon_chain/slot_timer.ex b/lib/archethic/beacon_chain/slot_timer.ex index d1d5ad9a71..eea27ce070 100644 --- a/lib/archethic/beacon_chain/slot_timer.ex +++ b/lib/archethic/beacon_chain/slot_timer.ex @@ -37,38 +37,54 @@ defmodule Archethic.BeaconChain.SlotTimer do Give the next beacon chain slot using the `SlotTimer` interval """ @spec next_slot(DateTime.t()) :: DateTime.t() - def next_slot(date_from = %DateTime{microsecond: {0, 0}}) do - get_interval() - |> CronParser.parse!(true) - |> CronScheduler.get_next_run_dates(DateTime.to_naive(date_from)) - |> Enum.at(1) - |> DateTime.from_naive!("Etc/UTC") - end - def next_slot(date_from = %DateTime{}) do - get_interval() - |> CronParser.parse!(true) - |> CronScheduler.get_next_run_date!(DateTime.to_naive(date_from)) - |> DateTime.from_naive!("Etc/UTC") + cron_expression = CronParser.parse!(get_interval(), true) + naive_date_from = DateTime.to_naive(date_from) + + if Crontab.DateChecker.matches_date?(cron_expression, naive_date_from) do + case date_from do + %DateTime{microsecond: {0, _}} -> + cron_expression + |> CronScheduler.get_next_run_dates(naive_date_from) + |> Enum.at(1) + |> DateTime.from_naive!("Etc/UTC") + + _ -> + cron_expression + |> CronScheduler.get_next_run_date!(naive_date_from) + |> DateTime.from_naive!("Etc/UTC") + end + else + cron_expression + |> CronScheduler.get_next_run_date!(naive_date_from) + |> DateTime.from_naive!("Etc/UTC") + end end @doc """ Returns the previous slot from the given date """ @spec previous_slot(DateTime.t()) :: DateTime.t() - def previous_slot(date_from = %DateTime{microsecond: {0, 0}}) do - get_interval() - |> CronParser.parse!(true) - |> CronScheduler.get_previous_run_dates(DateTime.to_naive(date_from)) - |> Enum.at(1) - |> DateTime.from_naive!("Etc/UTC") - end - def previous_slot(date_from = %DateTime{}) do - get_interval() - |> CronParser.parse!(true) - |> CronScheduler.get_previous_run_date!(DateTime.to_naive(date_from)) - |> DateTime.from_naive!("Etc/UTC") + cron_expression = CronParser.parse!(get_interval(), true) + naive_date_from = DateTime.to_naive(date_from) + + if Crontab.DateChecker.matches_date?(cron_expression, naive_date_from) do + case date_from do + %DateTime{microsecond: {microsecond, _}} when microsecond > 0 -> + DateTime.truncate(date_from, :second) + + _ -> + cron_expression + |> CronScheduler.get_previous_run_dates(naive_date_from) + |> Enum.at(1) + |> DateTime.from_naive!("Etc/UTC") + end + else + cron_expression + |> CronScheduler.get_previous_run_date!(naive_date_from) + |> DateTime.from_naive!("Etc/UTC") + end end @doc """ @@ -113,15 +129,19 @@ defmodule Archethic.BeaconChain.SlotTimer do :up -> Logger.info("Slot Timer: Starting...") + next_time = next_slot(DateTime.utc_now() |> DateTime.truncate(:second)) - {:ok, %{interval: interval, timer: schedule_new_slot(interval)}} + {:ok, %{interval: interval, timer: schedule_new_slot(interval), next_time: next_time}} end end def handle_info(:node_up, server_data = %{interval: interval}) do Logger.info("Slot Timer: Starting...") - new_server_data = Map.put(server_data, :timer, schedule_new_slot(interval)) + new_server_data = + server_data + |> Map.put(:timer, schedule_new_slot(interval)) + |> Map.put(:next_time, next_slot(DateTime.utc_now() |> DateTime.truncate(:second))) {:noreply, new_server_data, :hibernate} end @@ -153,7 +173,14 @@ defmodule Archethic.BeaconChain.SlotTimer do :skip end - {:noreply, Map.put(state, :timer, timer), :hibernate} + next_time = next_slot(next_time) + + new_state = + state + |> Map.put(:timer, timer) + |> Map.put(:next_time, next_time) + + {:noreply, new_state, :hibernate} end def handle_cast({:new_conf, conf}, state) do diff --git a/lib/archethic/beacon_chain/subset.ex b/lib/archethic/beacon_chain/subset.ex index 5655552416..d0ba87592b 100644 --- a/lib/archethic/beacon_chain/subset.ex +++ b/lib/archethic/beacon_chain/subset.ex @@ -58,6 +58,14 @@ defmodule Archethic.BeaconChain.Subset do GenServer.cast(via_tuple(subset), {:add_slot, slot, node_public_key, signature}) end + @doc """ + Add node public key to the corresponding subset for beacon updates + """ + @spec subscribe_for_beacon_updates(binary(), Crypto.key()) :: :ok + def subscribe_for_beacon_updates(subset, node_public_key) do + GenServer.cast(via_tuple(subset), {:subscribe_node_to_beacon_updates, node_public_key}) + end + @doc """ Get the current slot """ @@ -77,7 +85,10 @@ defmodule Archethic.BeaconChain.Subset do %{ node_public_key: Crypto.first_node_public_key(), subset: subset, - current_slot: %Slot{subset: subset, slot_time: SlotTimer.next_slot(DateTime.utc_now())}, + current_slot: %Slot{ + subset: subset, + slot_time: SlotTimer.next_slot(DateTime.utc_now() |> DateTime.truncate(:second)) + }, subscribed_nodes: [], postponed: %{end_of_sync: [], transaction_attestations: []} }} @@ -218,7 +229,7 @@ defmodule Archethic.BeaconChain.Subset do new_state = update_in(state, [:postponed, :transaction_attestations], &[attestation | &1]) Logger.info( - "Transaction #{type}@#{Base.encode16(address)} will be added to the next beacon chain (#{DateTime.to_string(next_slot_time)} slot)", + "Transaction #{type}@#{Base.encode16(address)} will be added to the next beacon chain (#{DateTime.to_string(next_slot_time)} slot) - tx timestamp: #{timestamp} - current slot time: #{slot_time}", beacon_subset: Base.encode16(subset) ) @@ -261,7 +272,7 @@ defmodule Archethic.BeaconChain.Subset do # Avoid to store or dispatch an empty beacon's slot unless Slot.empty?(current_slot) do - current_slot = %{current_slot | slot_time: SlotTimer.previous_slot(time)} + # current_slot = %{current_slot | slot_time: SlotTimer.previous_slot(time)} if summary_time?(time) do SummaryCache.add_slot(subset, current_slot) @@ -371,12 +382,4 @@ defmodule Archethic.BeaconChain.Subset do end defp ensure_p2p_view(slot = %Slot{}, _), do: slot - - @doc """ - Add node public key to the corresponding subset for beacon updates - """ - @spec subscribe_for_beacon_updates(binary(), Crypto.key()) :: :ok - def subscribe_for_beacon_updates(subset, node_public_key) do - GenServer.cast(via_tuple(subset), {:subscribe_node_to_beacon_updates, node_public_key}) - end end diff --git a/lib/archethic/beacon_chain/summary_timer.ex b/lib/archethic/beacon_chain/summary_timer.ex index d3663162df..99a16b6417 100644 --- a/lib/archethic/beacon_chain/summary_timer.ex +++ b/lib/archethic/beacon_chain/summary_timer.ex @@ -28,10 +28,18 @@ defmodule Archethic.BeaconChain.SummaryTimer do naive_date_from = DateTime.to_naive(date_from) if Crontab.DateChecker.matches_date?(cron_expression, naive_date_from) do - cron_expression - |> CronScheduler.get_next_run_dates(naive_date_from) - |> Enum.at(1) - |> DateTime.from_naive!("Etc/UTC") + case date_from do + %DateTime{microsecond: {0, _}} -> + cron_expression + |> CronScheduler.get_next_run_dates(naive_date_from) + |> Enum.at(1) + |> DateTime.from_naive!("Etc/UTC") + + _ -> + cron_expression + |> CronScheduler.get_next_run_date!(naive_date_from) + |> DateTime.from_naive!("Etc/UTC") + end else cron_expression |> CronScheduler.get_next_run_date!(naive_date_from) @@ -48,10 +56,16 @@ defmodule Archethic.BeaconChain.SummaryTimer do naive_date_from = DateTime.to_naive(date_from) if Crontab.DateChecker.matches_date?(cron_expression, naive_date_from) do - cron_expression - |> CronScheduler.get_previous_run_dates(naive_date_from) - |> Enum.at(1) - |> DateTime.from_naive!("Etc/UTC") + case date_from do + %DateTime{microsecond: {microsecond, _}} when microsecond > 0 -> + DateTime.truncate(date_from, :second) + + _ -> + cron_expression + |> CronScheduler.get_previous_run_dates(naive_date_from) + |> Enum.at(1) + |> DateTime.from_naive!("Etc/UTC") + end else cron_expression |> CronScheduler.get_previous_run_date!(naive_date_from) diff --git a/lib/archethic/self_repair/sync.ex b/lib/archethic/self_repair/sync.ex index 41080b3999..b8699bb0cc 100644 --- a/lib/archethic/self_repair/sync.ex +++ b/lib/archethic/self_repair/sync.ex @@ -106,7 +106,8 @@ defmodule Archethic.SelfRepair.Sync do patch :: binary() ) :: :ok | {:error, :unreachable_nodes} def load_missed_transactions(last_sync_date = %DateTime{}, patch) when is_binary(patch) do - last_summary_time = BeaconChain.previous_summary_time(DateTime.utc_now()) + last_summary_time = + BeaconChain.previous_summary_time(DateTime.utc_now() |> DateTime.truncate(:second)) if last_summary_time > last_sync_date do Logger.info(