diff --git a/lib/archethic/replication/transaction_validator.ex b/lib/archethic/replication/transaction_validator.ex index f3d844809a..4b570966ef 100644 --- a/lib/archethic/replication/transaction_validator.ex +++ b/lib/archethic/replication/transaction_validator.ex @@ -45,6 +45,26 @@ defmodule Archethic.Replication.TransactionValidator do | :invalid_pending_transaction | :invalid_inherit_constraints + @doc """ + Lists Validation errors + """ + def list_errors(), + do: [ + :invalid_atomic_commitment, + :invalid_node_election, + :invalid_proof_of_work, + :invalid_validation_stamp_signature, + :invalid_transaction_fee, + :invalid_transaction_movements, + :insufficient_funds, + :invalid_unspent_outputs, + :invalid_chain, + :invalid_transaction_with_inconsistencies, + :invalid_contract_acceptance, + :invalid_pending_transaction, + :invalid_inherit_constraints + ] + @doc """ Validate transaction with context diff --git a/lib/archethic/self_repair/notifier.ex b/lib/archethic/self_repair/notifier.ex index 671160bed1..59bbc0854f 100644 --- a/lib/archethic/self_repair/notifier.ex +++ b/lib/archethic/self_repair/notifier.ex @@ -160,9 +160,8 @@ defmodule Archethic.SelfRepair.Notifier do |> Stream.map(&new_storage_nodes(&1, unavailable_node_key)) |> Stream.scan(%{}, &map_node_and_address(&1, _acc = &2)) |> Stream.take(-1) + |> Enum.take(1) |> notify_nodes(genesis_address) - - :ok end @doc """ @@ -213,7 +212,7 @@ defmodule Archethic.SelfRepair.Notifier do end @doc """ - New election is carried out on the set of all authorized omiting unavailable_node_key. + New election is carried out on the set of all authorized omiting unavailable_node. The set of previous storage nodes is subtracted from the set of new storage nodes. """ @spec new_storage_nodes({binary(), list(Crypto.key())}, Crypto.key()) :: @@ -225,8 +224,10 @@ defmodule Archethic.SelfRepair.Notifier do P2P.authorized_nodes() |> Enum.reject(&(&1.first_public_key == unavailable_node_key)) ) + |> Enum.reject(&(&1.first_public_key in prev_storage_node)) + |> Enum.map(& &1.first_public_key) - {address, node_list -- prev_storage_node} + {address, node_list} end @doc """ diff --git a/lib/archethic/self_repair/notifier/impl.ex b/lib/archethic/self_repair/notifier/impl.ex index 0af9b58106..c654b44c3d 100644 --- a/lib/archethic/self_repair/notifier/impl.ex +++ b/lib/archethic/self_repair/notifier/impl.ex @@ -14,9 +14,10 @@ defmodule Archethic.SelfRepair.Notifier.Impl do require Logger - # @validation_errors Replication.TransactionValidator.list_error() @registry_name Archethic.SelfRepair.Notifier.RepairRegistry + @validation_errors Replication.TransactionValidator.list_errors() + @doc """ Returns Registry Name used to store pid and genesis_address. """ @@ -26,7 +27,13 @@ defmodule Archethic.SelfRepair.Notifier.Impl do Update Corresponding worker with new Request message. """ def update_worker(msg, pid) do - GenStateMachine.cast(pid, {:update_request, msg}) + case Process.alive?(pid) do + true -> + GenStateMachine.cast(pid, {:update_request, msg}) + + false -> + start_worker(msg) + end end @doc """ @@ -34,7 +41,7 @@ defmodule Archethic.SelfRepair.Notifier.Impl do """ @spec repair_in_progress?(genesis_address :: binary()) :: false | pid() def repair_in_progress?(genesis_address) do - case Registry.lookup(WorkflowRegistry, genesis_address) do + case Registry.lookup(@registry_name, genesis_address) do [{pid, _}] -> pid @@ -43,10 +50,7 @@ defmodule Archethic.SelfRepair.Notifier.Impl do end end - def unregister_repair_worker(genesis_address) do - Registry.unregister(@registry_name, genesis_address) - end - + @spec start_worker(map()) :: :ok def start_worker(opts) do DynamicSupervisor.start_child( RepairSupervisor, @@ -55,35 +59,76 @@ defmodule Archethic.SelfRepair.Notifier.Impl do opts } ) + + :ok end + @spec repair_chain(binary(), binary()) :: {:ok, :error} | {:ok, :continue} @doc """ - Fetches tx and repairs chain + Fetches tx and repairs chain. blocking code. """ - def repair_chain(address) do - try do - with false <- TransactionChain.transaction_exists?(address), - {:ok, node_list} <- get_nodes(address), - {:ok, txn} <- TransactionChain.fetch_transaction_remotely(address, node_list), - :ok <- Replication.validate_and_store_transaction_chain(txn) do - {:ok, :continue} - else - e -> - Logger.debug("RepairWorker: Error during repair #{address}", error: e) - {:ok, :continue} - - # {:error, e} when e in @validation_errors ->#:stop - # {:error, _e} ->#{:ok, :continue}# {:error, :empty}| # {:error,:transaction_already_exists} - # {:error, :transaction_not_exists}|{:error, :transaction_invalid}||# {:error, :network_issue} - end - rescue + def repair_chain(address, genesis_address) do + # try do + with false <- + TransactionChain.transaction_exists?(address), + {:ok, node_list} <- + get_nodes(address), + {:ok, txn} <- + TransactionChain.fetch_transaction_remotely(address, node_list), + :ok <- + Replication.validate_and_store_transaction_chain(txn) + |> tap(fn x -> IO.inspect(x, label: "=4=") end) do + log(:debug, "Successfull Repair", genesis_address, address, nil) + {:ok, :continue} + else + {:error, e = :empty} -> + log(:debug, "Election returned empty set, Omitting", genesis_address, address, e) + {:ok, :error} + + {:error, e} when e in @validation_errors -> + log(:warning, "Replication returned Validation Error", genesis_address, address, e) + {:ok, :error} + + {:error, e} + when e in [:transaction_not_exists, :transaction_invalid, :network_issue] -> + log(:warning, "Fetch Issue", genesis_address, address, e) + {:ok, :error} + + {:error, e = :transaction_already_exists} -> + log(:debug, "", genesis_address, address, e) + {:ok, :error} + e -> - {:ok, :continue} + log(:warning, "Unhandled error", genesis_address, address, e) + {:ok, :error} + end + + # rescue + # e -> + # IO.inspect(e) + # log(:warning, "Crash during Repair", genesis_address, address, e) + # {:ok, :crash} + # end + end + + def log(type, msg, genesis_address, address, e) do + gen_addr = Base.encode16(genesis_address) + last_addr = Base.encode16(address) + + case type do + :debug -> + Logger.debug( + "RepairWorker: gen_addr: #{gen_addr} , addr: #{last_addr} . #{msg}, Error: #{e}" + ) - Logger.warning("RepairWorker: Crash during Repair #{address}", crash: e) + :warning -> + Logger.warning( + "RepairWorker: gen_addr: #{gen_addr} , addr: #{last_addr}. #{msg} Error: #{e}" + ) end end + @spec get_nodes(binary) :: {:error, :empty} | {:ok, [Archethic.P2P.Node.t()]} def get_nodes(address) do nodes = address diff --git a/lib/archethic/self_repair/notifier/repair_worker.ex b/lib/archethic/self_repair/notifier/repair_worker.ex index a0f2913dd7..7bfd740ec2 100644 --- a/lib/archethic/self_repair/notifier/repair_worker.ex +++ b/lib/archethic/self_repair/notifier/repair_worker.ex @@ -34,7 +34,7 @@ defmodule Archethic.SelfRepair.Notifier.RepairWorker do } ) do [repair_addr | new_address_list] = address_list - {:ok, :continue} = Notifier.Impl.repair_chain(repair_addr) + {:ok, _} = Notifier.Impl.repair_chain(repair_addr, server_data.genesis_address) Logger.debug("RepairWorker: Repair sync_chain state") diff --git a/test/archethic/self_repair/notifier/impl_test..ex b/test/archethic/self_repair/notifier/impl_test..ex deleted file mode 100644 index 866244491c..0000000000 --- a/test/archethic/self_repair/notifier/impl_test..ex +++ /dev/null @@ -1,38 +0,0 @@ -defmodule Archethic.SelfRepair.Notifier.ImplTest do - @moduledoc false - use ExUnit.Case - - alias Archethic.{ - Crypto, - Election, - P2P, - P2P.Node, - TransactionChain, - RepairWorker, - RepairSupervisor, - Replication, - SelfRepair.Notifier.Impl - } - - alias SelfRepair.Notifier.Impl, as: NotifierImpl - - @registry_name NotifierImpl.registry_name() - - doctest NotifierImpl - - describe "RegistryOperations" do - setup do - pid = start_supervised!({Registry, name: @registry_name, keys: :unique, partitions: 1}) - - on_exit(fn -> - Process.exit(pid, :brutal_kill) - end) - - :ok - end - - test "General Flow" do - assert Process.whereis(@registry_name) - end - end -end diff --git a/test/archethic/self_repair/notifier/impl_test.exs b/test/archethic/self_repair/notifier/impl_test.exs new file mode 100644 index 0000000000..022fb93a26 --- /dev/null +++ b/test/archethic/self_repair/notifier/impl_test.exs @@ -0,0 +1,238 @@ +defmodule Archethic.SelfRepair.Notifier.ImplTest do + @moduledoc false + use ArchethicCase + + alias Archethic.{ + Crypto, + P2P, + P2P.Node, + SharedSecrets, + SelfRepair.Notifier.Impl, + TransactionChain.Transaction.ValidationStamp.LedgerOperations.UnspentOutput + } + + alias P2P.Message.{ + GetTransaction, + FirstAddress, + GetTransactionInputs, + TransactionInputList, + GetFirstAddress, + GetTransactionChain, + TransactionList + } + + alias Impl, as: NotifierImpl + doctest NotifierImpl + + @registry_name NotifierImpl.registry_name() + import Mox + + describe "RegistryOperations" do + setup do + case Process.whereis(@registry_name) do + nil -> + start_supervised!({Registry, name: @registry_name, keys: :unique, partitions: 1}) + + pid when is_pid(pid) -> + :ok + end + + :ok + end + + test "Naive Ops" do + me = self() + key = "gen_addr" + + Registry.register(@registry_name, key, []) + + assert [{^me, _}] = Registry.lookup(@registry_name, key) + + Registry.unregister(@registry_name, key) + + assert [] = Registry.lookup(@registry_name, key) + end + + test "repair_in_progress?()/1" do + key = "random_key" + refute NotifierImpl.repair_in_progress?(key) + + Registry.register(@registry_name, key, []) + Registry.unregister(@registry_name, key) + refute NotifierImpl.repair_in_progress?(key) + end + + test "Registry Must be Cleaned up automatically" do + key = "random_process" + refute NotifierImpl.repair_in_progress?(key) + + pid = + spawn(fn -> + Registry.register(@registry_name, "random_process", []) + + receive do + :exit -> + nil + end + end) + + Process.sleep(100) + assert NotifierImpl.repair_in_progress?(key) + + send(pid, :exit) + Process.sleep(100) + + refute NotifierImpl.repair_in_progress?(key) + end + end + + describe "repair_chain/2" do + setup do + pb_key2 = Crypto.derive_keypair("key22_random", 0) |> elem(0) + pb_key3 = Crypto.derive_keypair("key23_random", 0) |> elem(0) + + SharedSecrets.add_origin_public_key(:software, Crypto.first_node_public_key()) + + P2P.add_and_connect_node(%Node{ + first_public_key: Crypto.first_node_public_key(), + last_public_key: Crypto.last_node_public_key(), + authorized?: true, + available?: true, + authorization_date: DateTime.add(DateTime.utc_now(), -86_400, :second), + geo_patch: "AAA", + network_patch: "AAA", + enrollment_date: DateTime.add(DateTime.utc_now(), -86_400, :second), + reward_address: Crypto.derive_address(Crypto.last_node_public_key()) + }) + + %Node{ + ip: {127, 0, 0, 1}, + port: 3000, + first_public_key: pb_key2, + last_public_key: pb_key2, + available?: true, + authorized?: true, + geo_patch: "BBB", + network_patch: "BBB", + authorization_date: DateTime.add(DateTime.utc_now(), -86_400, :second), + reward_address: Crypto.derive_address(pb_key2), + enrollment_date: DateTime.add(DateTime.utc_now(), -86_400, :second) + } + |> P2P.add_and_connect_node() + + %Node{ + ip: {127, 0, 0, 1}, + port: 3000, + first_public_key: pb_key3, + last_public_key: pb_key3, + available?: true, + authorized?: true, + geo_patch: "BBB", + network_patch: "BBB", + authorization_date: DateTime.add(DateTime.utc_now(), -86_400 * 2, :second), + reward_address: Crypto.derive_address(pb_key3), + enrollment_date: DateTime.add(DateTime.utc_now(), -86_400 * 2, :second) + } + |> P2P.add_and_connect_node() + + :ok + end + + test " a" do + %{txn0: txn0, txn1: txn1, txn2: txn2} = factory_built_chain() + + addr0 = txn0.address + addr1 = txn1.address + addr2 = txn2.address + + MockDB + |> stub( + :transaction_exists?, + fn _ -> + false + end + ) + + MockClient + |> stub(:send_message, fn + _, %GetFirstAddress{address: _}, _ -> + {:ok, %FirstAddress{address: addr0}} + + _, %GetTransaction{address: ^addr2}, _ -> + {:ok, txn2} + + _, %GetTransactionInputs{}, _ -> + {:ok, + %TransactionInputList{ + inputs: [ + %UnspentOutput{ + from: "@Apoorv", + amount: 2_000_000_000, + type: :UCO, + timestamp: ~U[2022-10-09 08:39:10.463Z] + } + ] + }} + + _, %GetTransaction{address: ^addr0}, _ -> + {:ok, txn0} + + _, %GetTransaction{address: ^addr1}, _ -> + {:ok, txn1} + + _, %GetTransactionChain{}, _ -> + {:ok, + %TransactionList{ + transactions: [txn0, txn1, txn2], + paging_state: nil, + more?: false + }} + end) + + {:ok, :continue} = NotifierImpl.repair_chain(txn2.address, txn0.address) + end + + def factory_built_chain() do + alias Archethic.TransactionFactory + seed = "seed gta sa" + time = DateTime.utc_now() + timestamp2 = time |> DateTime.add(-5000) + timestamp1 = time |> DateTime.add(-5000 * 2) + timestamp0 = time |> DateTime.add(-5000 * 3) + + txn0 = + TransactionFactory.create_valid_chain( + [ + %UnspentOutput{ + from: "@Apoorv", + amount: 2_000_000_000, + type: :UCO, + timestamp: ~U[2022-10-09 08:39:10.463Z] + } + ], + seed: seed, + index: 0, + prev_txn: [], + timestamp: timestamp0 + ) + + txn1 = + TransactionFactory.create_valid_chain([], + seed: seed, + index: 1, + prev_txn: [txn0], + timestamp: timestamp1 + ) + + txn2 = + TransactionFactory.create_valid_chain([], + seed: seed, + index: 2, + prev_txn: [txn1], + timestamp: timestamp2 + ) + + %{txn0: txn0, txn1: txn1, txn2: txn2} + end + end +end diff --git a/test/archethic/self_repair/notifier/repair_worker_test.exs b/test/archethic/self_repair/notifier/repair_worker_test.exs new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test/archethic/self_repair/notifier_test.exs b/test/archethic/self_repair/notifier_test.exs index 6ba1508e03..e6a3ec5aff 100644 --- a/test/archethic/self_repair/notifier_test.exs +++ b/test/archethic/self_repair/notifier_test.exs @@ -86,8 +86,8 @@ defmodule Archethic.SelfRepair.NotifierTest do test "get_nodes_list/1", %{ node0: %Node{first_public_key: node0}, node1: %Node{first_public_key: node1}, - node2: %Node{first_public_key: node2}, - node3: _node3 + node2: %Node{first_public_key: node2} + # node3: %Node{first_public_key: node3} } do # Should return intended nodes from setup assert [^node0, ^node2, ^node1] = @@ -97,13 +97,24 @@ defmodule Archethic.SelfRepair.NotifierTest do Enum.map(Notifier.get_nodes_list(~U[2022-10-05 00:00:00Z]), & &1.first_public_key) end - test "with_down_shard?/2", %{node0: node0, node1: node1, node2: node2, node3: node3} do + test "with_down_shard?/2", %{ + node0: %Node{first_public_key: node0}, + node1: %Node{first_public_key: node1}, + node2: %Node{first_public_key: node2}, + node3: %Node{first_public_key: node3} + } do # prev shard must have unavailable node as to repair missing shard refute Notifier.with_down_shard?({"", [node0, node1, node2, node3]}, "node4") assert Notifier.with_down_shard?({"", [node0, node1, node2, node3]}, "node3") end - test "current_node_in_node_list?/2", %{node0: node0, node1: node1, node2: node2, node3: node3} do + test "current_node_in_node_list?/2", + %{ + node0: %Node{first_public_key: node0}, + node1: %Node{first_public_key: node1}, + node2: %Node{first_public_key: node2}, + node3: %Node{first_public_key: node3} + } do # prev shard must have unavailable node as to repair missing shard refute Notifier.current_node_in_node_list?({"", [node0, node1, node2, node3]}, "node4") @@ -114,25 +125,31 @@ defmodule Archethic.SelfRepair.NotifierTest do end test "new_storage_nodes/2", %{ - node0: node0, - node1: node1 = %Node{first_public_key: key1}, - node2: node2 = %Node{first_public_key: key2}, - node3: node3 + node0: %Node{first_public_key: node0}, + node1: %Node{first_public_key: node1}, + node2: %Node{first_public_key: node2}, + node3: %Node{first_public_key: node3} } do - assert {"abc", [%Node{first_public_key: ^key1}, %Node{first_public_key: ^key2}]} = - Notifier.new_storage_nodes({"abc", [node0]}, node3.first_public_key) + assert {"txn_addr", [^node2, ^node1]} = + Notifier.new_storage_nodes({"txn_addr", [node0]}, node3) end - test "map_node_and_address/2", %{ - node0: node0 = %Node{first_public_key: key0}, - node1: node1 = %Node{first_public_key: key1}, - node2: node2 = %Node{first_public_key: key2}, - node3: node3 = %Node{first_public_key: key3} - } do - acc = Notifier.map_node_and_address({"txn00", [node0, node1, node2, node3]}, %{}) + test "map_node_and_address/2", + %{ + node0: %Node{first_public_key: key0}, + node1: %Node{first_public_key: key1}, + node2: %Node{first_public_key: key2}, + node3: %Node{first_public_key: key3} + } do + assert %{ + ^key0 => "txn00", + ^key1 => "txn00", + ^key2 => "txn00", + ^key3 => "txn00" + } = acc = Notifier.map_node_and_address({"txn00", [key0, key1, key2, key3]}, %{}) assert %{^key0 => "txn01", ^key1 => "txn01", ^key2 => "txn01", ^key3 => "txn01"} = - Notifier.map_node_and_address({"txn01", [node0, node1, node2, node3]}, acc) + Notifier.map_node_and_address({"txn01", [key0, key1, key2, key3]}, acc) assert [ %{ @@ -142,17 +159,17 @@ defmodule Archethic.SelfRepair.NotifierTest do ^key3 => "txn01" } ] = - [{"txn00", [node0, node1, node2]}, {"txn01", [node2, node3]}] + [{"txn00", [key0, key1, key2]}, {"txn01", [key2, key3]}] |> Stream.scan(%{}, &Notifier.map_node_and_address(&1, _acc = &2)) |> Stream.take(-1) |> Enum.to_list() end test "notify_nodes/2", %{ - node0: node0 = %Node{first_public_key: key0}, - node1: node1 = %Node{first_public_key: key1}, - node2: node2 = %Node{first_public_key: key2}, - node3: node3 = %Node{first_public_key: key3} + node0: %Node{first_public_key: key0}, + node1: %Node{first_public_key: key1}, + node2: %Node{first_public_key: key2}, + node3: %Node{first_public_key: key3} } do me = self() assert :ok = Notifier.notify_nodes([], "genesis_address") diff --git a/test/support/transaction_factory.ex b/test/support/transaction_factory.ex index 76d791f095..cd5c43e6a2 100644 --- a/test/support/transaction_factory.ex +++ b/test/support/transaction_factory.ex @@ -272,4 +272,39 @@ defmodule Archethic.TransactionFactory do cross_validation_stamps: [cross_validation_stamp] } end + + def create_valid_chain( + inputs \\ [], + opts \\ [] + ) do + type = Keyword.get(opts, :type, :transfer) + seed = Keyword.get(opts, :seed, "seed") + index = Keyword.get(opts, :index) + content = Keyword.get(opts, :content, "") + prev_txn = Keyword.get(opts, :prev_txn, []) + timestamp = Keyword.get(opts, :timestamp) |> DateTime.truncate(:millisecond) + + txn = Transaction.new(type, %TransactionData{content: content}, seed, index) + + ledger_operations = + %LedgerOperations{ + fee: Fee.calculate(txn, 0.07), + transaction_movements: Transaction.get_movements(txn) + } + |> LedgerOperations.consume_inputs(txn.address, inputs, timestamp) + + validation_stamp = + %ValidationStamp{ + timestamp: timestamp, + proof_of_work: Crypto.origin_node_public_key(), + proof_of_election: Election.validation_nodes_election_seed_sorting(txn, timestamp), + proof_of_integrity: TransactionChain.proof_of_integrity([txn | prev_txn]), + ledger_operations: ledger_operations, + protocol_version: ArchethicCase.current_protocol_version() + } + |> ValidationStamp.sign() + + cross_validation_stamp = CrossValidationStamp.sign(%CrossValidationStamp{}, validation_stamp) + %{txn | validation_stamp: validation_stamp, cross_validation_stamps: [cross_validation_stamp]} + end end