Skip to content

Commit

Permalink
Start Scheduler post node :up
Browse files Browse the repository at this point in the history
- fix tests
- change to node_up
  • Loading branch information
apoorv-2204 committed Sep 7, 2022
1 parent bdd8e22 commit c6e1f37
Show file tree
Hide file tree
Showing 16 changed files with 604 additions and 87 deletions.
31 changes: 25 additions & 6 deletions lib/archethic/beacon_chain/slot_timer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ defmodule Archethic.BeaconChain.SlotTimer do

require Logger

@slot_timer_ets :archethic_slot_timer

@doc """
Create a new slot timer
"""
Expand Down Expand Up @@ -85,19 +87,36 @@ defmodule Archethic.BeaconChain.SlotTimer do
end

defp get_interval do
[{_, interval}] = :ets.lookup(:archethic_slot_timer, :interval)
[{_, interval}] = :ets.lookup(@slot_timer_ets, :interval)
interval
end

@doc false
def init(opts) do
:ets.new(@slot_timer_ets, [:named_table, :public, read_concurrency: true])
interval = Keyword.get(opts, :interval)
:ets.new(:archethic_slot_timer, [:named_table, :public, read_concurrency: true])
:ets.insert(:archethic_slot_timer, {:interval, interval})
:ets.insert(@slot_timer_ets, {:interval, interval})

case :persistent_term.get(:archethic_up, nil) do
nil ->
Logger.info("Slot Timer: Waiting for Node to complete Bootstrap.")

Archethic.PubSub.register_to_node_up()
{:ok, %{interval: interval}}

:up ->
Logger.info("Slot Timer: Starting...")

{:ok, %{interval: interval, timer: schedule_new_slot(interval)}}
end
end

def handle_info(:node_up, server_data = %{interval: interval}) do
Logger.info("Slot Timer: Starting...")

Logger.info("Starting SlotTimer")
new_server_data = Map.put(server_data, :timer, schedule_new_slot(interval))

{:ok, %{interval: interval, timer: schedule_new_slot(interval)}, :hibernate}
{:noreply, new_server_data, :hibernate}
end

def handle_info(
Expand Down Expand Up @@ -130,7 +149,7 @@ defmodule Archethic.BeaconChain.SlotTimer do
{:noreply, state}

new_interval ->
:ets.insert(:archethic_slot_timer, {:interval, new_interval})
:ets.insert(@slot_timer_ets, {:interval, new_interval})
{:noreply, Map.put(state, :interval, new_interval)}
end
end
Expand Down
5 changes: 3 additions & 2 deletions lib/archethic/bootstrap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ defmodule Archethic.Bootstrap do
else
post_bootstrap(sync?: false)
end

Logger.info("Bootstrapping finished!")
end

defp should_bootstrap?(_ip, _port, _http_port, _, nil), do: true
Expand Down Expand Up @@ -195,8 +197,6 @@ defmodule Archethic.Bootstrap do
end
end
end

Logger.info("Bootstrapping finished!")
end

defp post_bootstrap(opts) do
Expand All @@ -213,6 +213,7 @@ defmodule Archethic.Bootstrap do
SelfRepair.start_scheduler()

:persistent_term.put(:archethic_up, :up)
Archethic.PubSub.notify_node_up()
Listener.listen()
end

Expand Down
61 changes: 46 additions & 15 deletions lib/archethic/oracle_chain/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,38 +49,63 @@ defmodule Archethic.OracleChain.Scheduler do
polling_interval = Keyword.fetch!(args, :polling_interval)
summary_interval = Keyword.fetch!(args, :summary_interval)

state_data =
%{}
|> Map.put(:polling_interval, polling_interval)
|> Map.put(:summary_interval, summary_interval)

case :persistent_term.get(:archethic_up, nil) do
nil ->
# node still bootstrapping , wait for it to finish Bootstrap
Logger.info(" Oracle Scheduler: Waiting for Node to complete Bootstrap. ")

PubSub.register_to_node_up()

{:ok, :idle, state_data}

# wait for node UP
:up ->
# when node is already bootstrapped, - handles scheduler crash
{state, new_state_data, events} = start_scheduler(state_data)
{:ok, state, new_state_data, events}
end
end

def start_scheduler(state_data) do
Logger.info("Oracle Scheduler: Starting... ")

PubSub.register_to_node_update()
Logger.info("Starting Oracle Scheduler")

case P2P.get_node_info(Crypto.first_node_public_key()) do
# Schedule polling for authorized node
# This case may happen in case of process restart after crash
{:ok, %Node{authorized?: true, available?: true}} ->
summary_date =
next_date(summary_interval, DateTime.utc_now() |> DateTime.truncate(:second))
next_date(
Map.get(state_data, :summary_interval),
DateTime.utc_now() |> DateTime.truncate(:second)
)

PubSub.register_to_new_transaction_by_type(:oracle)

index = chain_size(summary_date)
Logger.info("Oracle Scheduler: Scheduled during init - (index: #{index})")

{:ok, :idle,
%{
polling_interval: polling_interval,
summary_interval: summary_interval,
summary_date: summary_date,
indexes: %{summary_date => index}
}, {:next_event, :internal, :schedule}}
new_state_data =
state_data
|> Map.put(:summary_date, summary_date)
|> Map.put(:indexes, %{summary_date => index})

{:idle, new_state_data, {:next_event, :internal, :schedule}}

_ ->
Logger.info("Oracle Scheduler: waiting for Node Update Message")

{:ok, :idle,
%{
polling_interval: polling_interval,
summary_interval: summary_interval,
indexes: %{}
}}
new_state_data =
state_data
|> Map.put(:indexes, %{})

{:idle, new_state_data, []}
end
end

Expand Down Expand Up @@ -133,6 +158,12 @@ defmodule Archethic.OracleChain.Scheduler do
{:next_state, :scheduled, new_data}
end

def handle_event(:info, :node_up, :idle, state_data) do
PubSub.unregister_to_node_up()
{:idle, new_state_data, events} = start_scheduler(state_data)
{:keep_state, new_state_data, events}
end

def handle_event(
:info,
{:new_transaction, address, :oracle, _timestamp},
Expand Down
24 changes: 24 additions & 0 deletions lib/archethic/pub_sub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,30 @@ defmodule Archethic.PubSub do
)
end

@doc """
Notify that Node is Up
"""
@spec notify_node_up() :: :ok
def notify_node_up() do
dispatch(:node_up, :node_up)
end

@spec register_to_node_up :: {:error, {:already_registered, pid}} | {:ok, pid}
@doc """
Register a process to notify node up
"""
def register_to_node_up() do
Registry.register(PubSubRegistry, :node_up, [])
end

@doc """
UnRegister a process to notification node up
"""
@spec unregister_to_node_up :: :ok
def unregister_to_node_up() do
Registry.unregister(PubSubRegistry, :node_up)
end

@doc """
Register a process to a new transaction publication by type
"""
Expand Down
76 changes: 40 additions & 36 deletions lib/archethic/reward/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ defmodule Archethic.Reward.Scheduler do

require Logger

@spec start_link(any) :: :ignore | {:error, any} | {:ok, pid}
def start_link(args \\ []) do
GenStateMachine.start_link(__MODULE__, args, name: __MODULE__)
end
Expand All @@ -37,8 +38,30 @@ defmodule Archethic.Reward.Scheduler do

def init(args) do
interval = Keyword.fetch!(args, :interval)
state_data = Map.put(%{}, :interval, interval)

case :persistent_term.get(:archethic_up, nil) do
nil ->
Logger.info(" Reward Scheduler: Waiting for Node to complete Bootstrap. ")

PubSub.register_to_node_up()
{:ok, :idle, state_data}

# wait for node up

:up ->
{state, new_state_data, events} = start_scheduler(state_data)
{:ok, state, new_state_data, events}
end
end

@doc """
Computers start parameters for the scheduler
"""
def start_scheduler(state_data) do
Logger.info("Reward Scheduler: Starting... ")

PubSub.register_to_node_update()
Logger.info("Starting Reward Scheduler")

case Crypto.first_node_public_key() |> P2P.get_node_info() |> elem(1) do
%Node{authorized?: true, available?: true} ->
Expand All @@ -48,17 +71,26 @@ defmodule Archethic.Reward.Scheduler do
index = Crypto.number_of_network_pool_keys()
Logger.info("Reward Scheduler scheduled during init - (index: #{index})")

{:ok, :idle,
%{interval: interval, index: index, next_address: Reward.next_address(index)},
{:idle,
state_data
|> Map.put(:index, index)
|> Map.put(:next_address, Reward.next_address(index)),
{:next_event, :internal, :schedule}}

_ ->
Logger.info("Reward Scheduler waiting for Node Update Message")

{:ok, :idle, %{interval: interval}}
{:idle, state_data, []}
end
end

def handle_event(:info, :node_up, :idle, state_data) do
# Node is up start Scheduler
PubSub.unregister_to_node_up()
{:idle, new_state_data, events} = start_scheduler(state_data)
{:keep_state, new_state_data, events}
end

def handle_event(
:info,
{:node_update,
Expand Down Expand Up @@ -138,41 +170,13 @@ defmodule Archethic.Reward.Scheduler do
:triggered,
data = %{index: index}
) do
next_index = index + 1
next_address = Reward.next_address(next_index)

new_data =
data
|> Map.put(:index, next_index)
|> Map.put(:next_address, next_address)

case Map.get(data, :watcher) do
nil ->
:ignore

pid ->
Process.exit(pid, :normal)
end

if Reward.initiator?(next_address) do
send_node_rewards(next_index)
{:keep_state, new_data}
else
{:ok, pid} =
DetectNodeResponsiveness.start_link(next_address, fn count ->
if Reward.initiator?(next_address, count) do
Logger.debug("Node reward creation...attempt #{count}",
transaction_address: Base.encode16(next_address)
)

send_node_rewards(next_index)
end
end)

Process.monitor(pid)
|> Map.update!(:index, &(&1 + 1))
|> Map.put(:next_address, Reward.next_address(index + 1))

{:keep_state, Map.put(data, :watcher, pid)}
end
send_node_rewards(index + 1)
{:keep_state, new_data}
end

def handle_event(
Expand Down
1 change: 1 addition & 0 deletions lib/archethic/reward/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Archethic.Reward.Supervisor do
alias Archethic.Reward.MemTablesLoader
alias Archethic.Reward.MemTables.RewardTokens

@spec start_link(any) :: :ignore | {:error, any} | {:ok, pid}
def start_link(args \\ []) do
Supervisor.start_link(__MODULE__, args, name: Archethic.RewardSupervisor)
end
Expand Down
38 changes: 31 additions & 7 deletions lib/archethic/shared_secrets/node_renewal_scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,43 @@ defmodule Archethic.SharedSecrets.NodeRenewalScheduler do
end

@doc false
def init(opts) do
interval = Keyword.get(opts, :interval)
PubSub.register_to_node_update()
def init(args) do
interval = Keyword.get(args, :interval)
state_data = Map.put(%{}, :interval, interval)

case :persistent_term.get(:archethic_up, nil) do
nil ->
Logger.info("Node Renewal Scheduler: Waiting for node to complete Bootstrap. ")

Logger.info("Starting Node Renewal Scheduler")
PubSub.register_to_node_up()
{:ok, :idle, state_data}

# wait for node ups
:up ->
{state, new_state_data, events} = start_scheduler(state_data)
{:ok, state, new_state_data, events}
end
end

def start_scheduler(state_data) do
Logger.info("Node Renewal Scheduler: Starting... ")

PubSub.register_to_node_update()

case Crypto.first_node_public_key() |> P2P.get_node_info() |> elem(1) do
%Node{authorized?: true, available?: true} ->
PubSub.register_to_new_transaction_by_type(:node_shared_secrets)
Logger.info("Node Renewal Scheduler: Scheduled during init")

key_index = Crypto.number_of_node_shared_secrets_keys()
new_state_data = state_data |> Map.put(:index, key_index)

{:ok, :idle, %{interval: interval, index: key_index},
[{:next_event, :internal, :schedule}]}
{:idle, new_state_data, [{:next_event, :internal, :schedule}]}

_ ->
Logger.info("Node Renewal Scheduler: Scheduler waiting for Node Update Message")

{:ok, :idle, %{interval: interval}}
{:idle, state_data, []}
end
end

Expand All @@ -90,6 +107,13 @@ defmodule Archethic.SharedSecrets.NodeRenewalScheduler do
{:next_state, :scheduled, new_data}
end

def handle_event(:info, :node_up, :idle, state_data) do
PubSub.unregister_to_node_up()
# Node is Up start Scheduler
{:idle, new_state_data, events} = start_scheduler(state_data)
{:keep_state, new_state_data, events}
end

def handle_event(
:info,
{:node_update,
Expand Down
Loading

0 comments on commit c6e1f37

Please sign in to comment.