From 9689fc5c8002687581b66a33780a8842a15d2981 Mon Sep 17 00:00:00 2001 From: Apoorv Date: Mon, 18 Jul 2022 17:36:17 +0530 Subject: [PATCH] replicate_txn_chain443 --- lib/archethic.ex | 27 ++++- .../replication/transaction_context.ex | 5 +- lib/archethic/transaction_chain.ex | 113 ++++++++++++++++-- test/archethic_test.exs | 96 ++++++++++++++- 4 files changed, 226 insertions(+), 15 deletions(-) diff --git a/lib/archethic.ex b/lib/archethic.ex index 54ef7cb6fa..95ec63d369 100644 --- a/lib/archethic.ex +++ b/lib/archethic.ex @@ -177,10 +177,14 @@ defmodule Archethic do end @doc """ - Retrieve a transaction chain based on an address from the closest nodes + Retrieve a transaction chain based on an address from the closest nodes. """ @spec get_transaction_chain(binary()) :: {:ok, list(Transaction.t())} | {:error, :network_issue} def get_transaction_chain(address) when is_binary(address) do + do_get_transaction_chain(TransactionChain.get_from_local(address), address) + end + + defp do_get_transaction_chain({false, nil}, address) do nodes = address |> Election.chain_storage_nodes(P2P.available_nodes()) @@ -201,6 +205,27 @@ defmodule Archethic do end end + defp do_get_transaction_chain({true, last_address}, address) when is_binary(last_address) do + chain_from_local = + Task.async(fn -> + case TransactionChain.fetch_chain_locally(last_address) do + {:ok, chain} -> chain + _ -> [] + end + end) + + chain_from_network = + Task.async(fn -> + case get_transaction_chain_by_paging_address(address, last_address) do + {:ok, chain} -> chain + _ -> [] + end + end) + + txn_list = Task.await(chain_from_local) ++ Task.await(chain_from_network) + {:ok, txn_list} + end + @doc """ Retrieve a transaction chain based on an address from the closest nodes by setting `paging_address as an offset address. diff --git a/lib/archethic/replication/transaction_context.ex b/lib/archethic/replication/transaction_context.ex index 1c62aa9729..a80b7db3b3 100644 --- a/lib/archethic/replication/transaction_context.ex +++ b/lib/archethic/replication/transaction_context.ex @@ -41,7 +41,10 @@ defmodule Archethic.Replication.TransactionContext do [] nodes -> - TransactionChain.stream_remotely(address, nodes) + case TransactionChain.get_from_local(address) do + {false, nil} -> TransactionChain.stream_remotely(address, nodes) + {true, last_address} -> TransactionChain.stream_remotely(address, nodes, last_address) + end end end diff --git a/lib/archethic/transaction_chain.ex b/lib/archethic/transaction_chain.ex index ff3b537476..1f38771b89 100644 --- a/lib/archethic/transaction_chain.ex +++ b/lib/archethic/transaction_chain.ex @@ -12,19 +12,23 @@ defmodule Archethic.TransactionChain do alias Archethic.P2P alias Archethic.P2P.Message alias Archethic.P2P.Message.Error - alias Archethic.P2P.Message.GetTransaction - alias Archethic.P2P.Message.GetTransactionChain - alias Archethic.P2P.Message.GetTransactionChainLength - alias Archethic.P2P.Message.GetLastTransactionAddress - alias Archethic.P2P.Message.GetTransactionInputs - alias Archethic.P2P.Message.GetUnspentOutputs - alias Archethic.P2P.Message.LastTransactionAddress + + alias Archethic.P2P.Node alias Archethic.P2P.Message.NotFound - alias Archethic.P2P.Message.TransactionChainLength alias Archethic.P2P.Message.TransactionList - alias Archethic.P2P.Message.TransactionInputList alias Archethic.P2P.Message.UnspentOutputList - alias Archethic.P2P.Node + alias Archethic.P2P.Message.TransactionInputList + alias Archethic.P2P.Message.TransactionChainLength + alias Archethic.P2P.Message.LastTransactionAddress + alias Archethic.P2P.Message.FirstAddress + + alias Archethic.P2P.Message.GetTransaction + alias Archethic.P2P.Message.GetFirstAddress + alias Archethic.P2P.Message.GetUnspentOutputs + alias Archethic.P2P.Message.GetTransactionChain + alias Archethic.P2P.Message.GetTransactionInputs + alias Archethic.P2P.Message.GetLastTransactionAddress + alias Archethic.P2P.Message.GetTransactionChainLength alias __MODULE__.MemTables.KOLedger alias __MODULE__.MemTables.PendingLedger @@ -805,4 +809,93 @@ defmodule Archethic.TransactionChain do {:error, :network_issue} end end + + @doc """ + Retrieve the last transaction address for a chain locally + It queries the the netowork for genesis address + """ + @spec get_last_address_locally(address :: binary()) :: + {:ok, binary()} | {:error, :not_found} | {:error, :network_issue} + def get_last_address_locally(address) when is_binary(address) do + genesis_address = + case get_genesis_transaction_address(address) do + {:ok, genesis_address} -> genesis_address + _ -> address + end + + case get_last_address(genesis_address) do + ^genesis_address -> {:error, :not_found} + last_address -> {:ok, last_address} + end + end + + @doc """ + Retrieve the transaction genesis address for a chain from the closest nodes + """ + @spec get_genesis_transaction_address(address :: binary()) :: + {:ok, binary()} + | {:error, :network_issue} + def get_genesis_transaction_address(address) when is_binary(address) do + resolve_genesis_address(address) + end + + @spec fetch_chain_locally(address :: binary()) :: {:ok, [Transaction.t()]} | {:error, :db_error} + def fetch_chain_locally(address) when is_binary(address) do + try do + {:ok, fetch_chain_db(get(address), [])} + catch + _ -> + Logger.debug("Error fetching chain locally {:error, :db_error}") + {:error, :db_error} + end + end + + def fetch_chain_db({chain, false, _}, acc), do: Enum.reverse(acc ++ chain) + + def fetch_chain_db({chain, true, paging_address}, acc) do + fetch_chain_db(get(paging_address, [], paging_address: paging_address), acc ++ chain) + end + + @spec get_from_local(binary) :: {boolean(), binary | nil} + def get_from_local(address) when is_binary(address) do + case get_last_address_locally(address) do + {:ok, last_address} -> {true, last_address} + _ -> {false, nil} + end + end + + @doc """ + Retrieve the genesis address of a chain + """ + @spec resolve_genesis_address(binary()) :: {:ok, binary()} | {:error, :network_issue} + def resolve_genesis_address(address) when is_binary(address) do + nodes = + address + |> Election.chain_storage_nodes(P2P.available_nodes()) + |> P2P.nearest_nodes() + |> Enum.filter(&Node.locally_available?/1) + + case fetch_genesis_address_remotely(address, nodes) do + {:ok, genesis_address} -> + {:ok, genesis_address} + + _e -> + {:error, :network_issue} + end + end + + @doc """ + Fetch the genesis|first address remotely + """ + @spec fetch_genesis_address_remotely(binary(), list(Node.t())) :: + {:ok, binary()} | {:error, :network_issue} + def fetch_genesis_address_remotely(address, nodes) when is_binary(address) do + case P2P.quorum_read(nodes, %GetFirstAddress{address: address}) do + {:ok, %FirstAddress{address: genesis_address}} -> + {:ok, genesis_address} + + _ -> + {:error, :network_issue} + end + end end diff --git a/test/archethic_test.exs b/test/archethic_test.exs index 7fab741155..4d54f06658 100644 --- a/test/archethic_test.exs +++ b/test/archethic_test.exs @@ -15,6 +15,9 @@ defmodule ArchethicTest do alias Archethic.P2P.Message.GetTransactionChain alias Archethic.P2P.Message.GetTransactionChainLength alias Archethic.P2P.Message.GetTransactionInputs + alias Archethic.P2P.Message.GetFirstAddress + + alias Archethic.P2P.Message.FirstAddress alias Archethic.P2P.Message.LastTransactionAddress alias Archethic.P2P.Message.NotFound alias Archethic.P2P.Message.Ok @@ -295,6 +298,16 @@ defmodule ArchethicTest do authorization_date: DateTime.utc_now() }) + MockClient + |> expect(:send_message, fn _node, %GetFirstAddress{}, _timeout -> + {:ok, %FirstAddress{address: "@Alice0"}} + end) + + MockDB + |> stub(:get_last_chain_address, fn address -> + address + end) + MockClient |> stub(:send_message, fn _, %GetTransactionChain{}, _ -> @@ -302,7 +315,75 @@ defmodule ArchethicTest do %TransactionList{ transactions: [ %Transaction{address: "@Alice2"}, - %Transaction{address: "@Alice1"} + %Transaction{address: "@Alice1"}, + %Transaction{address: "@Alice0"} + ] + }} + + _, %GetTransactionChainLength{}, _ -> + %TransactionChainLength{length: 1} + end) + + assert {:ok, + [ + %Transaction{address: "@Alice2"}, + %Transaction{address: "@Alice1"}, + %Transaction{address: "@Alice0"} + ]} = Archethic.get_transaction_chain("@Alice2") + end + + test "should get_transaction_chain from local db and remaining from network " do + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 1}, + port: 3000, + first_public_key: Crypto.last_node_public_key(), + last_public_key: Crypto.last_node_public_key(), + network_patch: "AAA", + geo_patch: "AAA" + }) + + P2P.add_and_connect_node(%Node{ + ip: {127, 0, 0, 1}, + port: 3000, + first_public_key: "key1", + last_public_key: "key1", + network_patch: "AAA", + geo_patch: "AAA", + available?: true, + authorized?: true, + authorization_date: DateTime.utc_now() + }) + + MockClient + |> expect(:send_message, fn _node, %GetFirstAddress{}, _timeout -> + {:ok, %FirstAddress{address: "@Alice0"}} + end) + + MockDB + |> stub(:get_last_chain_address, fn address -> + "@Alice5" + end) + + MockDB + |> stub(:get_transaction_chain, fn address, [], [] -> + {[ + %Transaction{address: "@Alice0"}, + %Transaction{address: "@Alice1"}, + %Transaction{address: "@Alice2"}, + %Transaction{address: "@Alice3"}, + %Transaction{address: "@Alice4"}, + %Transaction{address: "@Alice5"} + ], false, nil} + end) + + MockClient + |> stub(:send_message, fn + _, %GetTransactionChain{address: "@Alice2", paging_state: "@Alice5"}, _ -> + {:ok, + %TransactionList{ + transactions: [ + %Transaction{address: "@Alice6"}, + %Transaction{address: "@Alice7"} ] }} @@ -310,8 +391,17 @@ defmodule ArchethicTest do %TransactionChainLength{length: 1} end) - assert {:ok, [%Transaction{address: "@Alice2"}, %Transaction{address: "@Alice1"}]} = - Archethic.get_transaction_chain("@Alice2") + assert {:ok, + [ + %Transaction{address: "@Alice5"}, + %Transaction{address: "@Alice4"}, + %Transaction{address: "@Alice3"}, + %Transaction{address: "@Alice2"}, + %Transaction{address: "@Alice1"}, + %Transaction{address: "@Alice0"}, + %Transaction{address: "@Alice6"}, + %Transaction{address: "@Alice7"} + ]} = Archethic.get_transaction_chain("@Alice2") end end