Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix beacon date projections #653

Merged
merged 5 commits into from
Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 57 additions & 29 deletions lib/archethic/beacon_chain/slot_timer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,38 +37,54 @@ defmodule Archethic.BeaconChain.SlotTimer do
Give the next beacon chain slot using the `SlotTimer` interval
"""
@spec next_slot(DateTime.t()) :: DateTime.t()
def next_slot(date_from = %DateTime{microsecond: {0, 0}}) do
get_interval()
|> CronParser.parse!(true)
|> CronScheduler.get_next_run_dates(DateTime.to_naive(date_from))
|> Enum.at(1)
|> DateTime.from_naive!("Etc/UTC")
end

def next_slot(date_from = %DateTime{}) do
get_interval()
|> CronParser.parse!(true)
|> CronScheduler.get_next_run_date!(DateTime.to_naive(date_from))
|> DateTime.from_naive!("Etc/UTC")
cron_expression = CronParser.parse!(get_interval(), true)
naive_date_from = DateTime.to_naive(date_from)

if Crontab.DateChecker.matches_date?(cron_expression, naive_date_from) do
case date_from do
%DateTime{microsecond: {0, _}} ->
cron_expression
|> CronScheduler.get_next_run_dates(naive_date_from)
|> Enum.at(1)
|> DateTime.from_naive!("Etc/UTC")

_ ->
cron_expression
|> CronScheduler.get_next_run_date!(naive_date_from)
|> DateTime.from_naive!("Etc/UTC")
end
else
cron_expression
|> CronScheduler.get_next_run_date!(naive_date_from)
|> DateTime.from_naive!("Etc/UTC")
end
end

@doc """
Returns the previous slot from the given date
"""
@spec previous_slot(DateTime.t()) :: DateTime.t()
def previous_slot(date_from = %DateTime{microsecond: {0, 0}}) do
get_interval()
|> CronParser.parse!(true)
|> CronScheduler.get_previous_run_dates(DateTime.to_naive(date_from))
|> Enum.at(1)
|> DateTime.from_naive!("Etc/UTC")
end

def previous_slot(date_from = %DateTime{}) do
get_interval()
|> CronParser.parse!(true)
|> CronScheduler.get_previous_run_date!(DateTime.to_naive(date_from))
|> DateTime.from_naive!("Etc/UTC")
cron_expression = CronParser.parse!(get_interval(), true)
naive_date_from = DateTime.to_naive(date_from)

if Crontab.DateChecker.matches_date?(cron_expression, naive_date_from) do
case date_from do
%DateTime{microsecond: {microsecond, _}} when microsecond > 0 ->
DateTime.truncate(date_from, :second)

_ ->
cron_expression
|> CronScheduler.get_previous_run_dates(naive_date_from)
|> Enum.at(1)
|> DateTime.from_naive!("Etc/UTC")
end
else
cron_expression
|> CronScheduler.get_previous_run_date!(naive_date_from)
|> DateTime.from_naive!("Etc/UTC")
end
end

@doc """
Expand Down Expand Up @@ -113,28 +129,33 @@ defmodule Archethic.BeaconChain.SlotTimer do

:up ->
Logger.info("Slot Timer: Starting...")
next_time = next_slot(DateTime.utc_now() |> DateTime.truncate(:second))

{:ok, %{interval: interval, timer: schedule_new_slot(interval)}}
{:ok, %{interval: interval, timer: schedule_new_slot(interval), next_time: next_time}}
end
end

def handle_info(:node_up, server_data = %{interval: interval}) do
Logger.info("Slot Timer: Starting...")

new_server_data = Map.put(server_data, :timer, schedule_new_slot(interval))
new_server_data =
server_data
|> Map.put(:timer, schedule_new_slot(interval))
|> Map.put(:next_time, next_slot(DateTime.utc_now() |> DateTime.truncate(:second)))
samuelmanzanera marked this conversation as resolved.
Show resolved Hide resolved

{:noreply, new_server_data, :hibernate}
end

def handle_info(
:new_slot,
state = %{
interval: interval
interval: interval,
next_time: next_time
}
) do
timer = schedule_new_slot(interval)

slot_time = DateTime.utc_now() |> DateTime.truncate(:millisecond)
slot_time = next_time

PubSub.notify_current_epoch_of_slot_timer(slot_time)

Expand All @@ -153,7 +174,14 @@ defmodule Archethic.BeaconChain.SlotTimer do
:skip
end

{:noreply, Map.put(state, :timer, timer), :hibernate}
next_time = next_slot(next_time)

new_state =
state
|> Map.put(:timer, timer)
|> Map.put(:next_time, next_time)

{:noreply, new_state, :hibernate}
end

def handle_cast({:new_conf, conf}, state) do
Expand Down
25 changes: 14 additions & 11 deletions lib/archethic/beacon_chain/subset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ defmodule Archethic.BeaconChain.Subset do
GenServer.cast(via_tuple(subset), {:add_slot, slot, node_public_key, signature})
end

@doc """
Add node public key to the corresponding subset for beacon updates
"""
@spec subscribe_for_beacon_updates(binary(), Crypto.key()) :: :ok
def subscribe_for_beacon_updates(subset, node_public_key) do
GenServer.cast(via_tuple(subset), {:subscribe_node_to_beacon_updates, node_public_key})
end

@doc """
Get the current slot
"""
Expand All @@ -77,7 +85,10 @@ defmodule Archethic.BeaconChain.Subset do
%{
node_public_key: Crypto.first_node_public_key(),
subset: subset,
current_slot: %Slot{subset: subset, slot_time: SlotTimer.next_slot(DateTime.utc_now())},
current_slot: %Slot{
subset: subset,
slot_time: SlotTimer.next_slot(DateTime.utc_now() |> DateTime.truncate(:second))
samuelmanzanera marked this conversation as resolved.
Show resolved Hide resolved
},
subscribed_nodes: [],
postponed: %{end_of_sync: [], transaction_attestations: []}
}}
Expand Down Expand Up @@ -218,7 +229,7 @@ defmodule Archethic.BeaconChain.Subset do
new_state = update_in(state, [:postponed, :transaction_attestations], &[attestation | &1])

Logger.info(
"Transaction #{type}@#{Base.encode16(address)} will be added to the next beacon chain (#{DateTime.to_string(next_slot_time)} slot)",
"Transaction #{type}@#{Base.encode16(address)} will be added to the next beacon chain (#{DateTime.to_string(next_slot_time)} slot) - tx timestamp: #{timestamp} - current slot time: #{slot_time}",
beacon_subset: Base.encode16(subset)
)

Expand Down Expand Up @@ -261,7 +272,7 @@ defmodule Archethic.BeaconChain.Subset do

# Avoid to store or dispatch an empty beacon's slot
unless Slot.empty?(current_slot) do
current_slot = %{current_slot | slot_time: SlotTimer.previous_slot(time)}
# current_slot = %{current_slot | slot_time: SlotTimer.previous_slot(time)}

if summary_time?(time) do
SummaryCache.add_slot(subset, current_slot)
Expand Down Expand Up @@ -371,12 +382,4 @@ defmodule Archethic.BeaconChain.Subset do
end

defp ensure_p2p_view(slot = %Slot{}, _), do: slot

@doc """
Add node public key to the corresponding subset for beacon updates
"""
@spec subscribe_for_beacon_updates(binary(), Crypto.key()) :: :ok
def subscribe_for_beacon_updates(subset, node_public_key) do
GenServer.cast(via_tuple(subset), {:subscribe_node_to_beacon_updates, node_public_key})
end
end
30 changes: 22 additions & 8 deletions lib/archethic/beacon_chain/summary_timer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,18 @@ defmodule Archethic.BeaconChain.SummaryTimer do
naive_date_from = DateTime.to_naive(date_from)

if Crontab.DateChecker.matches_date?(cron_expression, naive_date_from) do
cron_expression
|> CronScheduler.get_next_run_dates(naive_date_from)
|> Enum.at(1)
|> DateTime.from_naive!("Etc/UTC")
case date_from do
%DateTime{microsecond: {0, _}} ->
cron_expression
|> CronScheduler.get_next_run_dates(naive_date_from)
|> Enum.at(1)
|> DateTime.from_naive!("Etc/UTC")

_ ->
cron_expression
|> CronScheduler.get_next_run_date!(naive_date_from)
|> DateTime.from_naive!("Etc/UTC")
end
else
cron_expression
|> CronScheduler.get_next_run_date!(naive_date_from)
Expand All @@ -48,10 +56,16 @@ defmodule Archethic.BeaconChain.SummaryTimer do
naive_date_from = DateTime.to_naive(date_from)

if Crontab.DateChecker.matches_date?(cron_expression, naive_date_from) do
cron_expression
|> CronScheduler.get_previous_run_dates(naive_date_from)
|> Enum.at(1)
|> DateTime.from_naive!("Etc/UTC")
case date_from do
%DateTime{microsecond: {microsecond, _}} when microsecond > 0 ->
DateTime.truncate(date_from, :second)

_ ->
cron_expression
|> CronScheduler.get_previous_run_dates(naive_date_from)
|> Enum.at(1)
|> DateTime.from_naive!("Etc/UTC")
end
else
cron_expression
|> CronScheduler.get_previous_run_date!(naive_date_from)
Expand Down
3 changes: 2 additions & 1 deletion lib/archethic/self_repair/sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ defmodule Archethic.SelfRepair.Sync do
patch :: binary()
) :: :ok | {:error, :unreachable_nodes}
def load_missed_transactions(last_sync_date = %DateTime{}, patch) when is_binary(patch) do
last_summary_time = BeaconChain.previous_summary_time(DateTime.utc_now())
last_summary_time =
BeaconChain.previous_summary_time(DateTime.utc_now() |> DateTime.truncate(:second))
samuelmanzanera marked this conversation as resolved.
Show resolved Hide resolved

if last_summary_time > last_sync_date do
Logger.info(
Expand Down