Skip to content

Commit

Permalink
impl tests
Browse files Browse the repository at this point in the history
  • Loading branch information
apoorv-2204 committed Nov 7, 2022
1 parent 8f7c363 commit 2ae0aaa
Show file tree
Hide file tree
Showing 9 changed files with 411 additions and 93 deletions.
20 changes: 20 additions & 0 deletions lib/archethic/replication/transaction_validator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,26 @@ defmodule Archethic.Replication.TransactionValidator do
| :invalid_pending_transaction
| :invalid_inherit_constraints

@doc """
Lists Validation errors
"""
def list_errors(),
do: [
:invalid_atomic_commitment,
:invalid_node_election,
:invalid_proof_of_work,
:invalid_validation_stamp_signature,
:invalid_transaction_fee,
:invalid_transaction_movements,
:insufficient_funds,
:invalid_unspent_outputs,
:invalid_chain,
:invalid_transaction_with_inconsistencies,
:invalid_contract_acceptance,
:invalid_pending_transaction,
:invalid_inherit_constraints
]

@doc """
Validate transaction with context
Expand Down
9 changes: 5 additions & 4 deletions lib/archethic/self_repair/notifier.ex
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,8 @@ defmodule Archethic.SelfRepair.Notifier do
|> Stream.map(&new_storage_nodes(&1, unavailable_node_key))
|> Stream.scan(%{}, &map_node_and_address(&1, _acc = &2))
|> Stream.take(-1)
|> Enum.take(1)
|> notify_nodes(genesis_address)

:ok
end

@doc """
Expand Down Expand Up @@ -213,7 +212,7 @@ defmodule Archethic.SelfRepair.Notifier do
end

@doc """
New election is carried out on the set of all authorized omiting unavailable_node_key.
New election is carried out on the set of all authorized omiting unavailable_node.
The set of previous storage nodes is subtracted from the set of new storage nodes.
"""
@spec new_storage_nodes({binary(), list(Crypto.key())}, Crypto.key()) ::
Expand All @@ -225,8 +224,10 @@ defmodule Archethic.SelfRepair.Notifier do
P2P.authorized_nodes()
|> Enum.reject(&(&1.first_public_key == unavailable_node_key))
)
|> Enum.reject(&(&1.first_public_key in prev_storage_node))
|> Enum.map(& &1.first_public_key)

{address, node_list -- prev_storage_node}
{address, node_list}
end

@doc """
Expand Down
99 changes: 72 additions & 27 deletions lib/archethic/self_repair/notifier/impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ defmodule Archethic.SelfRepair.Notifier.Impl do

require Logger

# @validation_errors Replication.TransactionValidator.list_error()
@registry_name Archethic.SelfRepair.Notifier.RepairRegistry

@validation_errors Replication.TransactionValidator.list_errors()

@doc """
Returns Registry Name used to store pid and genesis_address.
"""
Expand All @@ -26,15 +27,21 @@ defmodule Archethic.SelfRepair.Notifier.Impl do
Update Corresponding worker with new Request message.
"""
def update_worker(msg, pid) do
GenStateMachine.cast(pid, {:update_request, msg})
case Process.alive?(pid) do
true ->
GenStateMachine.cast(pid, {:update_request, msg})

false ->
start_worker(msg)
end
end

@doc """
Return true if the Chain repair is OnGoing
"""
@spec repair_in_progress?(genesis_address :: binary()) :: false | pid()
def repair_in_progress?(genesis_address) do
case Registry.lookup(WorkflowRegistry, genesis_address) do
case Registry.lookup(@registry_name, genesis_address) do
[{pid, _}] ->
pid

Expand All @@ -43,10 +50,7 @@ defmodule Archethic.SelfRepair.Notifier.Impl do
end
end

def unregister_repair_worker(genesis_address) do
Registry.unregister(@registry_name, genesis_address)
end

@spec start_worker(map()) :: :ok
def start_worker(opts) do
DynamicSupervisor.start_child(
RepairSupervisor,
Expand All @@ -55,35 +59,76 @@ defmodule Archethic.SelfRepair.Notifier.Impl do
opts
}
)

:ok
end

@spec repair_chain(binary(), binary()) :: {:ok, :error} | {:ok, :continue}
@doc """
Fetches tx and repairs chain
Fetches tx and repairs chain. blocking code.
"""
def repair_chain(address) do
try do
with false <- TransactionChain.transaction_exists?(address),
{:ok, node_list} <- get_nodes(address),
{:ok, txn} <- TransactionChain.fetch_transaction_remotely(address, node_list),
:ok <- Replication.validate_and_store_transaction_chain(txn) do
{:ok, :continue}
else
e ->
Logger.debug("RepairWorker: Error during repair #{address}", error: e)
{:ok, :continue}

# {:error, e} when e in @validation_errors ->#:stop
# {:error, _e} ->#{:ok, :continue}# {:error, :empty}| # {:error,:transaction_already_exists}
# {:error, :transaction_not_exists}|{:error, :transaction_invalid}||# {:error, :network_issue}
end
rescue
def repair_chain(address, genesis_address) do
# try do
with false <-
TransactionChain.transaction_exists?(address),
{:ok, node_list} <-
get_nodes(address),
{:ok, txn} <-
TransactionChain.fetch_transaction_remotely(address, node_list),
:ok <-
Replication.validate_and_store_transaction_chain(txn)
|> tap(fn x -> IO.inspect(x, label: "=4=") end) do
log(:debug, "Successfull Repair", genesis_address, address, nil)
{:ok, :continue}
else
{:error, e = :empty} ->
log(:debug, "Election returned empty set, Omitting", genesis_address, address, e)
{:ok, :error}

{:error, e} when e in @validation_errors ->
log(:warning, "Replication returned Validation Error", genesis_address, address, e)
{:ok, :error}

{:error, e}
when e in [:transaction_not_exists, :transaction_invalid, :network_issue] ->
log(:warning, "Fetch Issue", genesis_address, address, e)
{:ok, :error}

{:error, e = :transaction_already_exists} ->
log(:debug, "", genesis_address, address, e)
{:ok, :error}

e ->
{:ok, :continue}
log(:warning, "Unhandled error", genesis_address, address, e)
{:ok, :error}
end

# rescue
# e ->
# IO.inspect(e)
# log(:warning, "Crash during Repair", genesis_address, address, e)
# {:ok, :crash}
# end
end

def log(type, msg, genesis_address, address, e) do
gen_addr = Base.encode16(genesis_address)
last_addr = Base.encode16(address)

case type do
:debug ->
Logger.debug(
"RepairWorker: gen_addr: #{gen_addr} , addr: #{last_addr} . #{msg}, Error: #{e}"
)

Logger.warning("RepairWorker: Crash during Repair #{address}", crash: e)
:warning ->
Logger.warning(
"RepairWorker: gen_addr: #{gen_addr} , addr: #{last_addr}. #{msg} Error: #{e}"
)
end
end

@spec get_nodes(binary) :: {:error, :empty} | {:ok, [Archethic.P2P.Node.t()]}
def get_nodes(address) do
nodes =
address
Expand Down
2 changes: 1 addition & 1 deletion lib/archethic/self_repair/notifier/repair_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ defmodule Archethic.SelfRepair.Notifier.RepairWorker do
}
) do
[repair_addr | new_address_list] = address_list
{:ok, :continue} = Notifier.Impl.repair_chain(repair_addr)
{:ok, _} = Notifier.Impl.repair_chain(repair_addr, server_data.genesis_address)

Logger.debug("RepairWorker: Repair sync_chain state")

Expand Down
38 changes: 0 additions & 38 deletions test/archethic/self_repair/notifier/impl_test..ex

This file was deleted.

Loading

0 comments on commit 2ae0aaa

Please sign in to comment.