Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start Schedulers Post Bootstrap #543 #552

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions lib/archethic/beacon_chain/slot_timer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ defmodule Archethic.BeaconChain.SlotTimer do

require Logger

@slot_timer_ets :archethic_slot_timer

@doc """
Create a new slot timer
"""
Expand Down Expand Up @@ -85,19 +87,36 @@ defmodule Archethic.BeaconChain.SlotTimer do
end

defp get_interval do
[{_, interval}] = :ets.lookup(:archethic_slot_timer, :interval)
[{_, interval}] = :ets.lookup(@slot_timer_ets, :interval)
interval
end

@doc false
def init(opts) do
:ets.new(@slot_timer_ets, [:named_table, :public, read_concurrency: true])
interval = Keyword.get(opts, :interval)
:ets.new(:archethic_slot_timer, [:named_table, :public, read_concurrency: true])
:ets.insert(:archethic_slot_timer, {:interval, interval})
:ets.insert(@slot_timer_ets, {:interval, interval})

case :persistent_term.get(:archethic_up, nil) do
nil ->
Logger.info("Slot Timer: Waiting for Node to complete Bootstrap.")

Archethic.PubSub.register_to_node_up()
{:ok, %{interval: interval}}

:up ->
Logger.info("Slot Timer: Starting...")

{:ok, %{interval: interval, timer: schedule_new_slot(interval)}}
end
end

def handle_info(:node_up, server_data = %{interval: interval}) do
Logger.info("Slot Timer: Starting...")

Logger.info("Starting SlotTimer")
new_server_data = Map.put(server_data, :timer, schedule_new_slot(interval))

{:ok, %{interval: interval, timer: schedule_new_slot(interval)}, :hibernate}
{:noreply, new_server_data, :hibernate}
end

def handle_info(
Expand Down Expand Up @@ -130,7 +149,7 @@ defmodule Archethic.BeaconChain.SlotTimer do
{:noreply, state}

new_interval ->
:ets.insert(:archethic_slot_timer, {:interval, new_interval})
:ets.insert(@slot_timer_ets, {:interval, new_interval})
{:noreply, Map.put(state, :interval, new_interval)}
end
end
Expand Down
5 changes: 3 additions & 2 deletions lib/archethic/bootstrap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ defmodule Archethic.Bootstrap do
else
post_bootstrap(sync?: false)
end

Logger.info("Bootstrapping finished!")
end

defp should_bootstrap?(_ip, _port, _http_port, _, nil), do: true
Expand Down Expand Up @@ -195,8 +197,6 @@ defmodule Archethic.Bootstrap do
end
end
end

Logger.info("Bootstrapping finished!")
end

defp post_bootstrap(opts) do
Expand All @@ -213,6 +213,7 @@ defmodule Archethic.Bootstrap do
SelfRepair.start_scheduler()

:persistent_term.put(:archethic_up, :up)
Archethic.PubSub.notify_node_up()
Listener.listen()
end

Expand Down
61 changes: 46 additions & 15 deletions lib/archethic/oracle_chain/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,38 +49,63 @@ defmodule Archethic.OracleChain.Scheduler do
polling_interval = Keyword.fetch!(args, :polling_interval)
summary_interval = Keyword.fetch!(args, :summary_interval)

state_data =
%{}
|> Map.put(:polling_interval, polling_interval)
|> Map.put(:summary_interval, summary_interval)

case :persistent_term.get(:archethic_up, nil) do
nil ->
# node still bootstrapping , wait for it to finish Bootstrap
Logger.info(" Oracle Scheduler: Waiting for Node to complete Bootstrap. ")

PubSub.register_to_node_up()

{:ok, :idle, state_data}

# wait for node UP
:up ->
# when node is already bootstrapped, - handles scheduler crash
{state, new_state_data, events} = start_scheduler(state_data)
{:ok, state, new_state_data, events}
end
end

def start_scheduler(state_data) do
Logger.info("Oracle Scheduler: Starting... ")

PubSub.register_to_node_update()
Logger.info("Starting Oracle Scheduler")

case P2P.get_node_info(Crypto.first_node_public_key()) do
# Schedule polling for authorized node
# This case may happen in case of process restart after crash
{:ok, %Node{authorized?: true, available?: true}} ->
summary_date =
next_date(summary_interval, DateTime.utc_now() |> DateTime.truncate(:second))
next_date(
Map.get(state_data, :summary_interval),
DateTime.utc_now() |> DateTime.truncate(:second)
)

PubSub.register_to_new_transaction_by_type(:oracle)

index = chain_size(summary_date)
Logger.info("Oracle Scheduler: Scheduled during init - (index: #{index})")

{:ok, :idle,
%{
polling_interval: polling_interval,
summary_interval: summary_interval,
summary_date: summary_date,
indexes: %{summary_date => index}
}, {:next_event, :internal, :schedule}}
new_state_data =
state_data
|> Map.put(:summary_date, summary_date)
|> Map.put(:indexes, %{summary_date => index})

{:idle, new_state_data, {:next_event, :internal, :schedule}}

_ ->
Logger.info("Oracle Scheduler: waiting for Node Update Message")

{:ok, :idle,
%{
polling_interval: polling_interval,
summary_interval: summary_interval,
indexes: %{}
}}
new_state_data =
state_data
|> Map.put(:indexes, %{})

{:idle, new_state_data, []}
end
end

Expand Down Expand Up @@ -133,6 +158,12 @@ defmodule Archethic.OracleChain.Scheduler do
{:next_state, :scheduled, new_data}
end

def handle_event(:info, :node_up, :idle, state_data) do
PubSub.unregister_to_node_up()
{:idle, new_state_data, events} = start_scheduler(state_data)
{:keep_state, new_state_data, events}
end

def handle_event(
:info,
{:new_transaction, address, :oracle, _timestamp},
Expand Down
24 changes: 24 additions & 0 deletions lib/archethic/pub_sub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,30 @@ defmodule Archethic.PubSub do
)
end

@doc """
Notify that Node is Up
"""
@spec notify_node_up() :: :ok
def notify_node_up() do
dispatch(:node_up, :node_up)
end

@spec register_to_node_up :: {:error, {:already_registered, pid}} | {:ok, pid}
@doc """
Register a process to notify node up
"""
def register_to_node_up() do
Registry.register(PubSubRegistry, :node_up, [])
end

@doc """
UnRegister a process to notification node up
"""
@spec unregister_to_node_up :: :ok
def unregister_to_node_up() do
Registry.unregister(PubSubRegistry, :node_up)
end

@doc """
Register a process to a new transaction publication by type
"""
Expand Down
76 changes: 40 additions & 36 deletions lib/archethic/reward/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ defmodule Archethic.Reward.Scheduler do

require Logger

@spec start_link(any) :: :ignore | {:error, any} | {:ok, pid}
def start_link(args \\ []) do
GenStateMachine.start_link(__MODULE__, args, name: __MODULE__)
end
Expand All @@ -37,8 +38,30 @@ defmodule Archethic.Reward.Scheduler do

def init(args) do
interval = Keyword.fetch!(args, :interval)
state_data = Map.put(%{}, :interval, interval)

case :persistent_term.get(:archethic_up, nil) do
nil ->
Logger.info(" Reward Scheduler: Waiting for Node to complete Bootstrap. ")

PubSub.register_to_node_up()
{:ok, :idle, state_data}

# wait for node up

:up ->
{state, new_state_data, events} = start_scheduler(state_data)
{:ok, state, new_state_data, events}
end
end

@doc """
Computers start parameters for the scheduler
"""
def start_scheduler(state_data) do
Logger.info("Reward Scheduler: Starting... ")

PubSub.register_to_node_update()
Logger.info("Starting Reward Scheduler")

case Crypto.first_node_public_key() |> P2P.get_node_info() |> elem(1) do
%Node{authorized?: true, available?: true} ->
Expand All @@ -48,17 +71,26 @@ defmodule Archethic.Reward.Scheduler do
index = Crypto.number_of_network_pool_keys()
Logger.info("Reward Scheduler scheduled during init - (index: #{index})")

{:ok, :idle,
%{interval: interval, index: index, next_address: Reward.next_address(index)},
{:idle,
state_data
|> Map.put(:index, index)
|> Map.put(:next_address, Reward.next_address(index)),
{:next_event, :internal, :schedule}}

_ ->
Logger.info("Reward Scheduler waiting for Node Update Message")

{:ok, :idle, %{interval: interval}}
{:idle, state_data, []}
end
end

def handle_event(:info, :node_up, :idle, state_data) do
# Node is up start Scheduler
PubSub.unregister_to_node_up()
{:idle, new_state_data, events} = start_scheduler(state_data)
{:keep_state, new_state_data, events}
end

def handle_event(
:info,
{:node_update,
Expand Down Expand Up @@ -138,41 +170,13 @@ defmodule Archethic.Reward.Scheduler do
:triggered,
data = %{index: index}
) do
next_index = index + 1
next_address = Reward.next_address(next_index)

new_data =
data
|> Map.put(:index, next_index)
|> Map.put(:next_address, next_address)

case Map.get(data, :watcher) do
nil ->
:ignore

pid ->
Process.exit(pid, :normal)
end

if Reward.initiator?(next_address) do
send_node_rewards(next_index)
{:keep_state, new_data}
else
{:ok, pid} =
DetectNodeResponsiveness.start_link(next_address, fn count ->
if Reward.initiator?(next_address, count) do
Logger.debug("Node reward creation...attempt #{count}",
transaction_address: Base.encode16(next_address)
)

send_node_rewards(next_index)
end
end)

Process.monitor(pid)
|> Map.update!(:index, &(&1 + 1))
|> Map.put(:next_address, Reward.next_address(index + 1))

{:keep_state, Map.put(data, :watcher, pid)}
end
send_node_rewards(index + 1)
{:keep_state, new_data}
end

def handle_event(
Expand Down
1 change: 1 addition & 0 deletions lib/archethic/reward/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Archethic.Reward.Supervisor do
alias Archethic.Reward.MemTablesLoader
alias Archethic.Reward.MemTables.RewardTokens

@spec start_link(any) :: :ignore | {:error, any} | {:ok, pid}
def start_link(args \\ []) do
Supervisor.start_link(__MODULE__, args, name: Archethic.RewardSupervisor)
end
Expand Down
38 changes: 31 additions & 7 deletions lib/archethic/shared_secrets/node_renewal_scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,43 @@ defmodule Archethic.SharedSecrets.NodeRenewalScheduler do
end

@doc false
def init(opts) do
interval = Keyword.get(opts, :interval)
PubSub.register_to_node_update()
def init(args) do
interval = Keyword.get(args, :interval)
state_data = Map.put(%{}, :interval, interval)

case :persistent_term.get(:archethic_up, nil) do
nil ->
Logger.info("Node Renewal Scheduler: Waiting for node to complete Bootstrap. ")

Logger.info("Starting Node Renewal Scheduler")
PubSub.register_to_node_up()
{:ok, :idle, state_data}

# wait for node ups
:up ->
{state, new_state_data, events} = start_scheduler(state_data)
{:ok, state, new_state_data, events}
end
end

def start_scheduler(state_data) do
Logger.info("Node Renewal Scheduler: Starting... ")

PubSub.register_to_node_update()

case Crypto.first_node_public_key() |> P2P.get_node_info() |> elem(1) do
%Node{authorized?: true, available?: true} ->
PubSub.register_to_new_transaction_by_type(:node_shared_secrets)
Logger.info("Node Renewal Scheduler: Scheduled during init")

key_index = Crypto.number_of_node_shared_secrets_keys()
new_state_data = state_data |> Map.put(:index, key_index)

{:ok, :idle, %{interval: interval, index: key_index},
[{:next_event, :internal, :schedule}]}
{:idle, new_state_data, [{:next_event, :internal, :schedule}]}

_ ->
Logger.info("Node Renewal Scheduler: Scheduler waiting for Node Update Message")

{:ok, :idle, %{interval: interval}}
{:idle, state_data, []}
end
end

Expand All @@ -90,6 +107,13 @@ defmodule Archethic.SharedSecrets.NodeRenewalScheduler do
{:next_state, :scheduled, new_data}
end

def handle_event(:info, :node_up, :idle, state_data) do
PubSub.unregister_to_node_up()
# Node is Up start Scheduler
{:idle, new_state_data, events} = start_scheduler(state_data)
{:keep_state, new_state_data, events}
end

def handle_event(
:info,
{:node_update,
Expand Down
Loading