Skip to content

Commit

Permalink
put common args in init
Browse files Browse the repository at this point in the history
fix tests
  • Loading branch information
apoorv-2204 committed Sep 3, 2022
1 parent 89b6235 commit c215943
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 96 deletions.
58 changes: 30 additions & 28 deletions lib/archethic/oracle_chain/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,65 +46,67 @@ defmodule Archethic.OracleChain.Scheduler do
end

def init(args) do
polling_interval = Keyword.fetch!(args, :polling_interval)
summary_interval = Keyword.fetch!(args, :summary_interval)

state_data =
%{}
|> Map.put(:polling_interval, polling_interval)
|> Map.put(:summary_interval, summary_interval)

case :persistent_term.get(:archethic_up, nil) do
:up ->
# when node is already bootstrapped, - handles scheduler crash
{state, server_data, events} = start_scheduler(args)
{:ok, state, server_data, events}
{state, new_state_data, events} = start_scheduler(state_data)
{:ok, state, new_state_data, events}

nil ->
# node still bootstrapping , wait for it to finish Bootstrap
Logger.info(" Oracle Scheduler: Waiting for Node to complete Bootstrap. ")

Archethic.PubSub.register_to_node_up()

{:ok, :idle, %{args: args}}
{:ok, :idle, state_data}

# wait for node UP
end
end

def start_scheduler(args) do
def start_scheduler(state_data) do
Logger.info("Oracle Scheduler: Starting... ")

polling_interval = Keyword.fetch!(args, :polling_interval)
summary_interval = Keyword.fetch!(args, :summary_interval)

PubSub.register_to_node_update()

case P2P.get_node_info(Crypto.first_node_public_key()) do
# Schedule polling for authorized node
# This case may happen in case of process restart after crash
{:ok, %Node{authorized?: true, available?: true}} ->
summary_date =
next_date(summary_interval, DateTime.utc_now() |> DateTime.truncate(:second))
next_date(
Map.get(state_data, :summary_interval),
DateTime.utc_now() |> DateTime.truncate(:second)
)

PubSub.register_to_new_transaction_by_type(:oracle)

index = chain_size(summary_date)
Logger.info("Oracle Scheduler: Scheduled during init - (index: #{index})")

{:idle,
%{
# server data must have old key
args: nil,
polling_interval: polling_interval,
summary_interval: summary_interval,
summary_date: summary_date,
indexes: %{summary_date => index}
}, {:next_event, :internal, :schedule}}
new_state_data =
state_data
|> Map.put(:summary_date, summary_date)
|> Map.put(:indexes, %{summary_date => index})

{:idle, new_state_data, {:next_event, :internal, :schedule}}

_ ->
Logger.info("Oracle Scheduler: waiting for Node Update Message")

{:idle,
%{
# server data must have old key
args: nil,
polling_interval: polling_interval,
summary_interval: summary_interval,
indexes: %{}
}, []}
new_state_data =
state_data
|> Map.put(:indexes, %{})

{:idle, new_state_data, []}
end
end

Expand Down Expand Up @@ -157,9 +159,9 @@ defmodule Archethic.OracleChain.Scheduler do
{:next_state, :scheduled, new_data}
end

def handle_event(:info, {:node_up, :up}, :idle, _state = %{args: args}) do
{:idle, server_data, events} = start_scheduler(args)
{:keep_state, server_data, events}
def handle_event(:info, {:node_up, :up}, :idle, state_data) do
{:idle, new_state_data, events} = start_scheduler(state_data)
{:keep_state, new_state_data, events}
end

def handle_event(
Expand Down
31 changes: 15 additions & 16 deletions lib/archethic/reward/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,29 +37,30 @@ defmodule Archethic.Reward.Scheduler do
end

def init(args) do
interval = Keyword.fetch!(args, :interval)
state_data = Map.put(%{}, :interval, interval)

case :persistent_term.get(:archethic_up, nil) do
nil ->
Logger.info(" Reward Scheduler: Waiting for Node to complete Bootstrap. ")

Archethic.PubSub.register_to_node_up()
{:ok, :idle, %{args: args}}
{:ok, :idle, state_data}

# wait for node up

:up ->
{state, server_data, events} = start_scheduler(args)
{:ok, state, server_data, events}
{state, new_state_data, events} = start_scheduler(state_data)
{:ok, state, new_state_data, events}
end
end

@doc """
Computers start parameters for the scheduler
"""
@spec start_scheduler(args :: keyword()) :: {atom(), map(), keyword()}
def start_scheduler(args) do
def start_scheduler(state_data) do
Logger.info("Reward Scheduler: Starting... ")

interval = Keyword.fetch!(args, :interval)
PubSub.register_to_node_update()

case Crypto.first_node_public_key() |> P2P.get_node_info() |> elem(1) do
Expand All @@ -71,24 +72,22 @@ defmodule Archethic.Reward.Scheduler do
Logger.info("Reward Scheduler scheduled during init - (index: #{index})")

{:idle,
%{
args: nil,
interval: interval,
index: index,
next_address: Reward.next_address(index)
}, {:next_event, :internal, :schedule}}
state_data
|> Map.put(:index, index)
|> Map.put(:next_address, Reward.next_address(index)),
{:next_event, :internal, :schedule}}

_ ->
Logger.info("Reward Scheduler waiting for Node Update Message")

{:idle, %{args: nil, interval: interval}, []}
{:idle, state_data, []}
end
end

def handle_event(:info, {:node_up, :up}, :idle, _data = %{args: args}) do
def handle_event(:info, {:node_up, :up}, :idle, state_data) do
# Node is up start Scheduler
{:idle, server_data, events} = start_scheduler(args)
{:keep_state, server_data, events}
{:idle, new_state_data, events} = start_scheduler(state_data)
{:keep_state, new_state_data, events}
end

def handle_event(
Expand Down
29 changes: 13 additions & 16 deletions lib/archethic/shared_secrets/node_renewal_scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,27 +45,26 @@ defmodule Archethic.SharedSecrets.NodeRenewalScheduler do

@doc false
def init(args) do
interval = Keyword.get(args, :interval)
state_data = Map.put(%{}, :interval, interval)

case :persistent_term.get(:archethic_up, nil) do
nil ->
Logger.info("Node Renewal Scheduler: Waiting for node to complete Bootstrap. ")

Archethic.PubSub.register_to_node_up()
{:ok, :idle, %{args: args}}
{:ok, :idle, state_data}

# wait for node ups
:up ->
{state, server_data, events} = start_scheduler(args)
{:ok, state, server_data, events}
{state, new_state_data, events} = start_scheduler(state_data)
{:ok, state, new_state_data, events}
end
end

@spec start_scheduler(keyword) ::
{:idle, %{args: nil, index: nil | non_neg_integer, interval: any},
[{:next_event, :internal, :schedule}]}
def start_scheduler(args) do
def start_scheduler(state_data) do
Logger.info("Node Renewal Scheduler: Starting... ")

interval = Keyword.get(args, :interval)
PubSub.register_to_node_update()

case Crypto.first_node_public_key() |> P2P.get_node_info() |> elem(1) do
Expand All @@ -74,16 +73,14 @@ defmodule Archethic.SharedSecrets.NodeRenewalScheduler do
Logger.info("Node Renewal Scheduler: Scheduled during init")

key_index = Crypto.number_of_node_shared_secrets_keys()
new_state_data = state_data |> Map.put(:index, key_index)

# server_data must have inital key
{:idle, %{args: nil, interval: interval, index: key_index},
[{:next_event, :internal, :schedule}]}
{:idle, new_state_data, [{:next_event, :internal, :schedule}]}

_ ->
Logger.info("Node Renewal Scheduler: Scheduler waiting for Node Update Message")

# server_data must have inital key
{:idle, %{args: nil, interval: interval}, []}
{:idle, state_data, []}
end
end

Expand All @@ -110,10 +107,10 @@ defmodule Archethic.SharedSecrets.NodeRenewalScheduler do
{:next_state, :scheduled, new_data}
end

def handle_event(:info, {:node_up, :up}, :idle, %{args: args}) do
def handle_event(:info, {:node_up, :up}, :idle, state_data) do
# Node is Up start Scheduler
{:idle, server_data, events} = start_scheduler(args)
{:keep_state, server_data, events}
{:idle, new_state_data, events} = start_scheduler(state_data)
{:keep_state, new_state_data, events}
end

def handle_event(
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ defmodule Archethic.MixProject do
# run single node
"dev.run": ["deps.get", "cmd mix dev.clean", "cmd iex -S mix"],
# Must be run before git push --no-verify | any(dialyzer issue)
"dev.checks": ["clean", "format", "compile", "credo", "cmd mix test", "dialyzer"],
"dev.checks": ["clean", "format", "compile", "credo", "cmd mix test --trace", "dialyzer"],
# paralele checks
"dev.pchecks": [" clean & format & compile & credo & test & dialyzer"],
# docker test-net with 3 nodes
Expand Down
18 changes: 14 additions & 4 deletions test/archethic/beacon_chain/subset_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ defmodule Archethic.BeaconChain.SubsetTest do

:ok = Subset.add_end_of_node_sync(subset, %EndOfNodeSync{public_key: public_key})

MockClient
|> stub(:send_message, fn
_, %NewBeaconTransaction{}, _ ->
{:ok, %Ok{}}
end)

assert %{
current_slot: %Slot{
end_of_node_synchronizations: [%EndOfNodeSync{public_key: ^public_key}]
Expand All @@ -53,8 +59,12 @@ defmodule Archethic.BeaconChain.SubsetTest do
test "new transaction summary is added to the slot and include the storage node confirmation",
%{subset: subset} do
MockClient
|> stub(:send_message, fn _, _txn = %TransactionSummary{}, _ ->
:ok
|> stub(:send_message, fn
_, _txn = %TransactionSummary{}, _ ->
{:ok, %Ok{}}

_, %NewBeaconTransaction{}, _ ->
{:ok, %Ok{}}
end)

start_supervised!({SummaryTimer, interval: "0 0 * * *"})
Expand Down Expand Up @@ -100,8 +110,8 @@ defmodule Archethic.BeaconChain.SubsetTest do
pid = start_supervised!({Subset, subset: subset})

MockClient
|> stub(:send_message, fn _, _txn = %NewBeaconTransaction{}, _ ->
:ok
|> stub(:send_message, fn _, %NewBeaconTransaction{}, _ ->
{:ok, %Ok{}}
end)

tx_time = DateTime.utc_now()
Expand Down
14 changes: 11 additions & 3 deletions test/archethic/mining/pending_transaction_validation_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ defmodule Archethic.Mining.PendingTransactionValidationTest do

{:ok, pid} = Scheduler.start_link(interval: "0 * * * * *")

assert {:idle, %{args: [interval: "0 * * * * *"]}} = :sys.get_state(pid)
assert {:idle, %{interval: "0 * * * * *"}} = :sys.get_state(pid)

send(pid, {:node_up, :up})

Expand Down Expand Up @@ -431,7 +431,7 @@ defmodule Archethic.Mining.PendingTransactionValidationTest do

{:ok, pid} = Scheduler.start_link(interval: "0 * * * * *")

assert {:idle, %{args: [interval: "0 * * * * *"]}} = :sys.get_state(pid)
assert {:idle, %{interval: "0 * * * * *"}} = :sys.get_state(pid)

send(pid, {:node_up, :up})

Expand Down Expand Up @@ -471,7 +471,7 @@ defmodule Archethic.Mining.PendingTransactionValidationTest do

{:ok, pid} = Scheduler.start_link(interval: "0 * * * * *")

assert {:idle, %{args: [interval: "0 * * * * *"]}} = :sys.get_state(pid)
assert {:idle, %{interval: "0 * * * * *"}} = :sys.get_state(pid)

send(pid, {:node_up, :up})

Expand Down Expand Up @@ -814,6 +814,14 @@ defmodule Archethic.Mining.PendingTransactionValidationTest do
{pub, _} = Crypto.derive_keypair(tx_seed, 1)
address = Crypto.derive_address(pub)

MockCrypto
|> stub(:first_public_key, fn ->
{pub, _} =
Crypto.derive_keypair("seed_for_mining_transaction_valdiation_test", 0, :secp256r1)

pub
end)

NetworkLookup.set_network_pool_address(:crypto.strong_rand_bytes(32))

{:ok, pid} = Scheduler.start_link(interval: "0 * * * * *")
Expand Down
Loading

0 comments on commit c215943

Please sign in to comment.