Skip to content

Commit

Permalink
Fix slot/summary timer prev/next function
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelmanzanera committed Oct 25, 2022
1 parent 867ddc5 commit e4a8031
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 49 deletions.
86 changes: 57 additions & 29 deletions lib/archethic/beacon_chain/slot_timer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,38 +37,54 @@ defmodule Archethic.BeaconChain.SlotTimer do
Give the next beacon chain slot using the `SlotTimer` interval
"""
@spec next_slot(DateTime.t()) :: DateTime.t()
def next_slot(date_from = %DateTime{microsecond: {0, 0}}) do
get_interval()
|> CronParser.parse!(true)
|> CronScheduler.get_next_run_dates(DateTime.to_naive(date_from))
|> Enum.at(1)
|> DateTime.from_naive!("Etc/UTC")
end

def next_slot(date_from = %DateTime{}) do
get_interval()
|> CronParser.parse!(true)
|> CronScheduler.get_next_run_date!(DateTime.to_naive(date_from))
|> DateTime.from_naive!("Etc/UTC")
cron_expression = CronParser.parse!(get_interval(), true)
naive_date_from = DateTime.to_naive(date_from)

if Crontab.DateChecker.matches_date?(cron_expression, naive_date_from) do
case date_from do
%DateTime{microsecond: {0, _}} ->
cron_expression
|> CronScheduler.get_next_run_dates(naive_date_from)
|> Enum.at(1)
|> DateTime.from_naive!("Etc/UTC")

_ ->
cron_expression
|> CronScheduler.get_next_run_date!(naive_date_from)
|> DateTime.from_naive!("Etc/UTC")
end
else
cron_expression
|> CronScheduler.get_next_run_date!(naive_date_from)
|> DateTime.from_naive!("Etc/UTC")
end
end

@doc """
Returns the previous slot from the given date
"""
@spec previous_slot(DateTime.t()) :: DateTime.t()
def previous_slot(date_from = %DateTime{microsecond: {0, 0}}) do
get_interval()
|> CronParser.parse!(true)
|> CronScheduler.get_previous_run_dates(DateTime.to_naive(date_from))
|> Enum.at(1)
|> DateTime.from_naive!("Etc/UTC")
end

def previous_slot(date_from = %DateTime{}) do
get_interval()
|> CronParser.parse!(true)
|> CronScheduler.get_previous_run_date!(DateTime.to_naive(date_from))
|> DateTime.from_naive!("Etc/UTC")
cron_expression = CronParser.parse!(get_interval(), true)
naive_date_from = DateTime.to_naive(date_from)

if Crontab.DateChecker.matches_date?(cron_expression, naive_date_from) do
case date_from do
%DateTime{microsecond: {microsecond, _}} when microsecond > 0 ->
DateTime.truncate(date_from, :second)

_ ->
cron_expression
|> CronScheduler.get_previous_run_dates(naive_date_from)
|> Enum.at(1)
|> DateTime.from_naive!("Etc/UTC")
end
else
cron_expression
|> CronScheduler.get_previous_run_date!(naive_date_from)
|> DateTime.from_naive!("Etc/UTC")
end
end

@doc """
Expand Down Expand Up @@ -113,28 +129,33 @@ defmodule Archethic.BeaconChain.SlotTimer do

:up ->
Logger.info("Slot Timer: Starting...")
next_time = next_slot(DateTime.utc_now() |> DateTime.truncate(:second))

{:ok, %{interval: interval, timer: schedule_new_slot(interval)}}
{:ok, %{interval: interval, timer: schedule_new_slot(interval), next_time: next_time}}
end
end

def handle_info(:node_up, server_data = %{interval: interval}) do
Logger.info("Slot Timer: Starting...")

new_server_data = Map.put(server_data, :timer, schedule_new_slot(interval))
new_server_data =
server_data
|> Map.put(:timer, schedule_new_slot(interval))
|> Map.put(:next_time, next_slot(DateTime.utc_now() |> DateTime.truncate(:second)))

{:noreply, new_server_data, :hibernate}
end

def handle_info(
:new_slot,
state = %{
interval: interval
interval: interval,
next_time: next_time
}
) do
timer = schedule_new_slot(interval)

slot_time = DateTime.utc_now() |> DateTime.truncate(:millisecond)
slot_time = next_time

PubSub.notify_current_epoch_of_slot_timer(slot_time)

Expand All @@ -153,7 +174,14 @@ defmodule Archethic.BeaconChain.SlotTimer do
:skip
end

{:noreply, Map.put(state, :timer, timer), :hibernate}
next_time = next_slot(next_time)

new_state =
state
|> Map.put(:timer, timer)
|> Map.put(:next_time, next_time)

{:noreply, new_state, :hibernate}
end

def handle_cast({:new_conf, conf}, state) do
Expand Down
25 changes: 14 additions & 11 deletions lib/archethic/beacon_chain/subset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ defmodule Archethic.BeaconChain.Subset do
GenServer.cast(via_tuple(subset), {:add_slot, slot, node_public_key, signature})
end

@doc """
Add node public key to the corresponding subset for beacon updates
"""
@spec subscribe_for_beacon_updates(binary(), Crypto.key()) :: :ok
def subscribe_for_beacon_updates(subset, node_public_key) do
GenServer.cast(via_tuple(subset), {:subscribe_node_to_beacon_updates, node_public_key})
end

@doc """
Get the current slot
"""
Expand All @@ -77,7 +85,10 @@ defmodule Archethic.BeaconChain.Subset do
%{
node_public_key: Crypto.first_node_public_key(),
subset: subset,
current_slot: %Slot{subset: subset, slot_time: SlotTimer.next_slot(DateTime.utc_now())},
current_slot: %Slot{
subset: subset,
slot_time: SlotTimer.next_slot(DateTime.utc_now() |> DateTime.truncate(:second))
},
subscribed_nodes: [],
postponed: %{end_of_sync: [], transaction_attestations: []}
}}
Expand Down Expand Up @@ -218,7 +229,7 @@ defmodule Archethic.BeaconChain.Subset do
new_state = update_in(state, [:postponed, :transaction_attestations], &[attestation | &1])

Logger.info(
"Transaction #{type}@#{Base.encode16(address)} will be added to the next beacon chain (#{DateTime.to_string(next_slot_time)} slot)",
"Transaction #{type}@#{Base.encode16(address)} will be added to the next beacon chain (#{DateTime.to_string(next_slot_time)} slot) - tx timestamp: #{timestamp} - current slot time: #{slot_time}",
beacon_subset: Base.encode16(subset)
)

Expand Down Expand Up @@ -261,7 +272,7 @@ defmodule Archethic.BeaconChain.Subset do

# Avoid to store or dispatch an empty beacon's slot
unless Slot.empty?(current_slot) do
current_slot = %{current_slot | slot_time: SlotTimer.previous_slot(time)}
# current_slot = %{current_slot | slot_time: SlotTimer.previous_slot(time)}

if summary_time?(time) do
SummaryCache.add_slot(subset, current_slot)
Expand Down Expand Up @@ -371,12 +382,4 @@ defmodule Archethic.BeaconChain.Subset do
end

defp ensure_p2p_view(slot = %Slot{}, _), do: slot

@doc """
Add node public key to the corresponding subset for beacon updates
"""
@spec subscribe_for_beacon_updates(binary(), Crypto.key()) :: :ok
def subscribe_for_beacon_updates(subset, node_public_key) do
GenServer.cast(via_tuple(subset), {:subscribe_node_to_beacon_updates, node_public_key})
end
end
30 changes: 22 additions & 8 deletions lib/archethic/beacon_chain/summary_timer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,18 @@ defmodule Archethic.BeaconChain.SummaryTimer do
naive_date_from = DateTime.to_naive(date_from)

if Crontab.DateChecker.matches_date?(cron_expression, naive_date_from) do
cron_expression
|> CronScheduler.get_next_run_dates(naive_date_from)
|> Enum.at(1)
|> DateTime.from_naive!("Etc/UTC")
case date_from do
%DateTime{microsecond: {0, _}} ->
cron_expression
|> CronScheduler.get_next_run_dates(naive_date_from)
|> Enum.at(1)
|> DateTime.from_naive!("Etc/UTC")

_ ->
cron_expression
|> CronScheduler.get_next_run_date!(naive_date_from)
|> DateTime.from_naive!("Etc/UTC")
end
else
cron_expression
|> CronScheduler.get_next_run_date!(naive_date_from)
Expand All @@ -48,10 +56,16 @@ defmodule Archethic.BeaconChain.SummaryTimer do
naive_date_from = DateTime.to_naive(date_from)

if Crontab.DateChecker.matches_date?(cron_expression, naive_date_from) do
cron_expression
|> CronScheduler.get_previous_run_dates(naive_date_from)
|> Enum.at(1)
|> DateTime.from_naive!("Etc/UTC")
case date_from do
%DateTime{microsecond: {microsecond, _}} when microsecond > 0 ->
DateTime.truncate(date_from, :second)

_ ->
cron_expression
|> CronScheduler.get_previous_run_dates(naive_date_from)
|> Enum.at(1)
|> DateTime.from_naive!("Etc/UTC")
end
else
cron_expression
|> CronScheduler.get_previous_run_date!(naive_date_from)
Expand Down
3 changes: 2 additions & 1 deletion lib/archethic/self_repair/sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ defmodule Archethic.SelfRepair.Sync do
patch :: binary()
) :: :ok | {:error, :unreachable_nodes}
def load_missed_transactions(last_sync_date = %DateTime{}, patch) when is_binary(patch) do
last_summary_time = BeaconChain.previous_summary_time(DateTime.utc_now())
last_summary_time =
BeaconChain.previous_summary_time(DateTime.utc_now() |> DateTime.truncate(:second))

if last_summary_time > last_sync_date do
Logger.info(
Expand Down

0 comments on commit e4a8031

Please sign in to comment.