Skip to content

Commit

Permalink
Boot Scheduler Post node Bootstrap
Browse files Browse the repository at this point in the history
  • Loading branch information
apoorv-2204 committed Aug 30, 2022
1 parent 058484b commit e662a0c
Show file tree
Hide file tree
Showing 12 changed files with 178 additions and 24 deletions.
3 changes: 3 additions & 0 deletions lib/archethic/account/mem_tables_loader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ defmodule Archethic.Account.MemTablesLoader do

{{to, _token_address}, utxo} ->
TokenLedger.add_unspent_output(to, utxo, timestamp)

{{to, _token_address, _token_id}, utxo} ->
TokenLedger.add_unspent_output(to, utxo, timestamp)
end)
end

Expand Down
1 change: 1 addition & 0 deletions lib/archethic/bootstrap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ defmodule Archethic.Bootstrap do

:persistent_term.put(:archethic_up, :up)
Listener.listen()
Archethic.PubSub.notify_node_up(:persistent_term.get(:archethic_up, nil))
end

defp first_initialization(
Expand Down
29 changes: 25 additions & 4 deletions lib/archethic/oracle_chain/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,27 @@ defmodule Archethic.OracleChain.Scheduler do
end

def init(args) do
case :persistent_term.get(:archethic_up, nil) do
:up ->
{state, server_data, events} = start_scheduler(args)
{:ok, state, server_data, events}

nil ->
Logger.info(" Waiting for Node to complete Bootstrap. ", Scheduler: :Oracle_Scheduler)

Archethic.PubSub.register_to_node_up()
{:ok, :idle, %{args: args}}
# wait for node UP
end
end

def start_scheduler(args) do
Logger.info(" Starting... ", Scheduler: :Oracle_Scheduler)

polling_interval = Keyword.fetch!(args, :polling_interval)
summary_interval = Keyword.fetch!(args, :summary_interval)

PubSub.register_to_node_update()
Logger.info("Starting Oracle Scheduler")

case P2P.get_node_info(Crypto.first_node_public_key()) do
# Schedule polling for authorized node
Expand All @@ -64,7 +80,7 @@ defmodule Archethic.OracleChain.Scheduler do
index = chain_size(summary_date)
Logger.info("Oracle Scheduler: Scheduled during init - (index: #{index})")

{:ok, :idle,
{:idle,
%{
polling_interval: polling_interval,
summary_interval: summary_interval,
Expand All @@ -75,15 +91,20 @@ defmodule Archethic.OracleChain.Scheduler do
_ ->
Logger.info("Oracle Scheduler: waiting for Node Update Message")

{:ok, :idle,
{:idle,
%{
polling_interval: polling_interval,
summary_interval: summary_interval,
indexes: %{}
}}
}, []}
end
end

def handle_event(:info, {:node_up, :up}, :idle, %{args: args}) do
{:idle, server_data, events} = start_scheduler(args)
{:keep_state, server_data, events}
end

def handle_event(
:internal,
:schedule,
Expand Down
24 changes: 24 additions & 0 deletions lib/archethic/pub_sub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,30 @@ defmodule Archethic.PubSub do
)
end

@doc """
Notify that Node is Up
"""
@spec notify_node_up(:up | nil) :: :ok
def notify_node_up(status) do
dispatch(:node_up, {:node_up, status})
end

@spec register_to_node_up :: {:error, {:already_registered, pid}} | {:ok, pid}
@doc """
Register a process to notify node up
"""
def register_to_node_up() do
Registry.register(PubSubRegistry, :node_up, [])
end

@doc """
UnRegister a process to notification node up
"""
@spec unregister_to_node_up :: :ok
def unregister_to_node_up() do
Registry.unregister(PubSubRegistry, :node_up)
end

@doc """
Register a process to a new transaction publication by type
"""
Expand Down
30 changes: 25 additions & 5 deletions lib/archethic/reward/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,25 @@ defmodule Archethic.Reward.Scheduler do
end

def init(args) do
case :persistent_term.get(:archethic_up, nil) do
:up ->
{state, server_data, events} = start_scheduler(args)
{:ok, state, server_data, events}

nil ->
Logger.info(" Waiting for Node to complete Bootstrap. ", Scheduler: :Reward_Scheduler)

Archethic.PubSub.register_to_node_up()
{:ok, :idle, %{args: args, interval: nil, index: nil, next_address: nil}}
# wait for node up
end
end

def start_scheduler(args) do
Logger.info(" Starting... ", Scheduler: :Reward_Scheduler)

interval = Keyword.fetch!(args, :interval)
PubSub.register_to_node_update()
Logger.info("Starting Reward Scheduler")

case Crypto.first_node_public_key() |> P2P.get_node_info() |> elem(1) do
%Node{authorized?: true, available?: true} ->
Expand All @@ -48,17 +64,21 @@ defmodule Archethic.Reward.Scheduler do
index = Crypto.number_of_network_pool_keys()
Logger.info("Reward Scheduler scheduled during init - (index: #{index})")

{:ok, :idle,
%{interval: interval, index: index, next_address: Reward.next_address(index)},
{:idle,
%{args: nil, interval: interval, index: index, next_address: Reward.next_address(index)},
{:next_event, :internal, :schedule}}

_ ->
Logger.info("Reward Scheduler waitng for Node Update Message")

{:ok, :idle, %{interval: interval}}
{:idle, %{args: nil, interval: interval}, []}
end
end

{:ok, :idle, %{interval: interval}}
def handle_event(:info, {:node_up, :up}, :idle, _data = %{args: args}) do
# Node is Up start Scheduler
{:idle, server_data, events} = start_scheduler(args)
{:keep_state, server_data, events}
end

def handle_event(
Expand Down
1 change: 1 addition & 0 deletions lib/archethic/reward/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Archethic.Reward.Supervisor do
alias Archethic.Reward.MemTablesLoader
alias Archethic.Reward.MemTables.RewardTokens

@spec start_link(any) :: :ignore | {:error, any} | {:ok, pid}
def start_link(args \\ []) do
Supervisor.start_link(__MODULE__, args, name: Archethic.RewardSupervisor)
end
Expand Down
36 changes: 29 additions & 7 deletions lib/archethic/shared_secrets/node_renewal_scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,28 @@ defmodule Archethic.SharedSecrets.NodeRenewalScheduler do
end

@doc false
def init(opts) do
interval = Keyword.get(opts, :interval)
PubSub.register_to_node_update()
def init(args) do
case :persistent_term.get(:archethic_up, nil) do
:up ->
{state, server_data, events} = start_scheduler(args)
{:ok, state, server_data, events}

nil ->
Logger.info(" Waiting for node to complete Bootstrap. ",
scheduler: :Node_Renewal_Scheduler
)

Archethic.PubSub.register_to_node_up()
{:ok, :idle, %{args: args}}
# wait for node up
end
end

def start_scheduler(args) do
Logger.info(" Starting... ", Scheduler: :Node_Renewal_Scheduler)

Logger.info("Starting Node Renewal Scheduler")
interval = Keyword.get(args, :interval)
PubSub.register_to_node_update()

case Crypto.first_node_public_key() |> P2P.get_node_info() |> elem(1) do
%Node{authorized?: true, available?: true} ->
Expand All @@ -57,16 +74,21 @@ defmodule Archethic.SharedSecrets.NodeRenewalScheduler do

key_index = Crypto.number_of_node_shared_secrets_keys()

{:ok, :idle, %{interval: interval, index: key_index},
[{:next_event, :internal, :schedule}]}
{:idle, %{interval: interval, index: key_index}, [{:next_event, :internal, :schedule}]}

_ ->
Logger.info("Node Renewal Scheduler: Scheduler waiting for Node Update Message")

{:ok, :idle, %{interval: interval}}
{:idle, %{interval: interval}, []}
end
end

def handle_event(:info, {:node_up, :up}, :idle, %{args: args}) do
# Node is Up start Scheduler
{:idle, server_data, events} = start_scheduler(args)
{:keep_state, server_data, events}
end

def handle_event(:internal, :schedule, _state, data = %{interval: interval}) do
timer =
case Map.get(data, :timer) do
Expand Down
2 changes: 1 addition & 1 deletion src/c/nat/miniupnp
24 changes: 21 additions & 3 deletions test/archethic/mining/pending_transaction_validation_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,13 @@ defmodule Archethic.Mining.PendingTransactionValidationTest do

NetworkLookup.set_network_pool_address(address)

Scheduler.start_link(interval: "0 * * * * *")
{:ok, pid} = Scheduler.start_link(interval: "0 * * * * *")

assert {:idle, %{args: [interval: "0 * * * * *"]}} = :sys.get_state(pid)

send(pid, {:node_up, :up})

assert {:idle, %{interval: "0 * * * * *"}} = :sys.get_state(pid)

MockDB
|> stub(:get_latest_burned_fees, fn -> 300_000_000 end)
Expand Down Expand Up @@ -391,7 +397,13 @@ defmodule Archethic.Mining.PendingTransactionValidationTest do

NetworkLookup.set_network_pool_address(address)

Scheduler.start_link(interval: "0 * * * * *")
{:ok, pid} = Scheduler.start_link(interval: "0 * * * * *")

assert {:idle, %{args: [interval: "0 * * * * *"]}} = :sys.get_state(pid)

send(pid, {:node_up, :up})

assert {:idle, %{interval: "0 * * * * *"}} = :sys.get_state(pid)

MockDB
|> stub(:get_latest_burned_fees, fn -> 200_000_000 end)
Expand Down Expand Up @@ -424,7 +436,13 @@ defmodule Archethic.Mining.PendingTransactionValidationTest do

NetworkLookup.set_network_pool_address(:crypto.strong_rand_bytes(32))

Scheduler.start_link(interval: "0 * * * * *")
{:ok, pid} = Scheduler.start_link(interval: "0 * * * * *")

assert {:idle, %{args: [interval: "0 * * * * *"]}} = :sys.get_state(pid)

send(pid, {:node_up, :up})

assert {:idle, %{interval: "0 * * * * *"}} = :sys.get_state(pid)

MockDB
|> stub(:get_latest_burned_fees, fn -> 300_000_000 end)
Expand Down
37 changes: 34 additions & 3 deletions test/archethic/oracle_chain/scheduler_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ defmodule Archethic.OracleChain.SchedulerTest do
:ok
end

describe "when receives a poll message" do
describe "Oracle Scheduler: when receives a poll message" do
setup do
me = self()

Expand Down Expand Up @@ -80,7 +80,14 @@ defmodule Archethic.OracleChain.SchedulerTest do
{:ok, pid} =
Scheduler.start_link([polling_interval: "0 * * * *", summary_interval: "0 0 0 * *"], [])

assert {:idle, _} = :sys.get_state(pid)
assert {:idle, %{args: [polling_interval: "0 * * * *", summary_interval: "0 0 0 * *"]}} =
:sys.get_state(pid)

send(pid, {:node_up, :up})

assert {:idle,
%{indexes: %{}, polling_interval: "0 * * * *", summary_interval: "0 0 0 * *"}} =
:sys.get_state(pid)

P2P.add_and_connect_node(%Node{
ip: {127, 0, 0, 1},
Expand Down Expand Up @@ -131,6 +138,15 @@ defmodule Archethic.OracleChain.SchedulerTest do
{:ok, pid} =
Scheduler.start_link([polling_interval: "0 * * * *", summary_interval: "0 0 0 * *"], [])

assert {:idle, %{args: [polling_interval: "0 * * * *", summary_interval: "0 0 0 * *"]}} =
:sys.get_state(pid)

send(pid, {:node_up, :up})

assert {:idle,
%{indexes: %{}, polling_interval: "0 * * * *", summary_interval: "0 0 0 * *"}} =
:sys.get_state(pid)

P2P.add_and_connect_node(%Node{
ip: {127, 0, 0, 1},
port: 3002,
Expand Down Expand Up @@ -170,6 +186,15 @@ defmodule Archethic.OracleChain.SchedulerTest do
{:ok, pid} =
Scheduler.start_link([polling_interval: "0 0 0 * *", summary_interval: "0 0 0 * *"], [])

assert {:idle, %{args: [polling_interval: "0 0 0 * *", summary_interval: "0 0 0 * *"]}} =
:sys.get_state(pid)

send(pid, {:node_up, :up})

assert {:idle,
%{indexes: %{}, polling_interval: "0 0 0 * *", summary_interval: "0 0 0 * *"}} =
:sys.get_state(pid)

P2P.add_and_connect_node(%Node{
ip: {127, 0, 0, 1},
port: 3002,
Expand Down Expand Up @@ -260,7 +285,13 @@ defmodule Archethic.OracleChain.SchedulerTest do
{:ok, pid} =
Scheduler.start_link([polling_interval: "0 * * * * *", summary_interval: "0 0 0 * *"], [])

assert {:idle, _} = :sys.get_state(pid)
assert {:idle, %{args: args}} = :sys.get_state(pid)

send(pid, {:node_up, :up})

assert {:idle,
%{indexes: %{}, polling_interval: "0 * * * * *", summary_interval: "0 0 0 * *"}} =
:sys.get_state(pid)

P2P.add_and_connect_node(%Node{
ip: {127, 0, 0, 1},
Expand Down
9 changes: 8 additions & 1 deletion test/archethic/reward/scheduler_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@ defmodule Archethic.Reward.SchedulerTest do

assert {:ok, pid} = Scheduler.start_link(interval: "*/1 * * * * *")

assert {:idle, %{interval: "*/1 * * * * *"}} = :sys.get_state(pid)
assert {:idle, %{interval: nil, index: nil, next_address: nil}} = :sys.get_state(pid)

send(pid, {:node_up, :up})

# as node is authorized and available
assert {:scheduled, %{interval: "*/1 * * * * *"}} = :sys.get_state(pid)

send(
pid,
Expand All @@ -57,6 +62,7 @@ defmodule Archethic.Reward.SchedulerTest do
me = self()

assert {:ok, pid} = Scheduler.start_link(interval: "*/1 * * * * *")
send(pid, {:node_up, :up})

MockClient
|> stub(:send_message, fn
Expand Down Expand Up @@ -94,6 +100,7 @@ defmodule Archethic.Reward.SchedulerTest do
end)

assert {:ok, pid} = Scheduler.start_link(interval: "*/1 * * * * *")
send(pid, {:node_up, :up})

send(
pid,
Expand Down
Loading

0 comments on commit e662a0c

Please sign in to comment.