Skip to content

Commit

Permalink
replicate_txn_chain443
Browse files Browse the repository at this point in the history
  • Loading branch information
apoorv-2204 committed Jul 18, 2022
1 parent 14f5dae commit 9689fc5
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 15 deletions.
27 changes: 26 additions & 1 deletion lib/archethic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,14 @@ defmodule Archethic do
end

@doc """
Retrieve a transaction chain based on an address from the closest nodes
Retrieve a transaction chain based on an address from the closest nodes.
"""
@spec get_transaction_chain(binary()) :: {:ok, list(Transaction.t())} | {:error, :network_issue}
def get_transaction_chain(address) when is_binary(address) do
do_get_transaction_chain(TransactionChain.get_from_local(address), address)
end

defp do_get_transaction_chain({false, nil}, address) do
nodes =
address
|> Election.chain_storage_nodes(P2P.available_nodes())
Expand All @@ -201,6 +205,27 @@ defmodule Archethic do
end
end

defp do_get_transaction_chain({true, last_address}, address) when is_binary(last_address) do
chain_from_local =
Task.async(fn ->
case TransactionChain.fetch_chain_locally(last_address) do
{:ok, chain} -> chain
_ -> []
end
end)

chain_from_network =
Task.async(fn ->
case get_transaction_chain_by_paging_address(address, last_address) do
{:ok, chain} -> chain
_ -> []
end
end)

txn_list = Task.await(chain_from_local) ++ Task.await(chain_from_network)
{:ok, txn_list}
end

@doc """
Retrieve a transaction chain based on an address from the closest nodes
by setting `paging_address as an offset address.
Expand Down
5 changes: 4 additions & 1 deletion lib/archethic/replication/transaction_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ defmodule Archethic.Replication.TransactionContext do
[]

nodes ->
TransactionChain.stream_remotely(address, nodes)
case TransactionChain.get_from_local(address) do
{false, nil} -> TransactionChain.stream_remotely(address, nodes)
{true, last_address} -> TransactionChain.stream_remotely(address, nodes, last_address)
end
end
end

Expand Down
113 changes: 103 additions & 10 deletions lib/archethic/transaction_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,23 @@ defmodule Archethic.TransactionChain do
alias Archethic.P2P
alias Archethic.P2P.Message
alias Archethic.P2P.Message.Error
alias Archethic.P2P.Message.GetTransaction
alias Archethic.P2P.Message.GetTransactionChain
alias Archethic.P2P.Message.GetTransactionChainLength
alias Archethic.P2P.Message.GetLastTransactionAddress
alias Archethic.P2P.Message.GetTransactionInputs
alias Archethic.P2P.Message.GetUnspentOutputs
alias Archethic.P2P.Message.LastTransactionAddress

alias Archethic.P2P.Node
alias Archethic.P2P.Message.NotFound
alias Archethic.P2P.Message.TransactionChainLength
alias Archethic.P2P.Message.TransactionList
alias Archethic.P2P.Message.TransactionInputList
alias Archethic.P2P.Message.UnspentOutputList
alias Archethic.P2P.Node
alias Archethic.P2P.Message.TransactionInputList
alias Archethic.P2P.Message.TransactionChainLength
alias Archethic.P2P.Message.LastTransactionAddress
alias Archethic.P2P.Message.FirstAddress

alias Archethic.P2P.Message.GetTransaction
alias Archethic.P2P.Message.GetFirstAddress
alias Archethic.P2P.Message.GetUnspentOutputs
alias Archethic.P2P.Message.GetTransactionChain
alias Archethic.P2P.Message.GetTransactionInputs
alias Archethic.P2P.Message.GetLastTransactionAddress
alias Archethic.P2P.Message.GetTransactionChainLength

alias __MODULE__.MemTables.KOLedger
alias __MODULE__.MemTables.PendingLedger
Expand Down Expand Up @@ -805,4 +809,93 @@ defmodule Archethic.TransactionChain do
{:error, :network_issue}
end
end

@doc """
Retrieve the last transaction address for a chain locally
It queries the the netowork for genesis address
"""
@spec get_last_address_locally(address :: binary()) ::
{:ok, binary()} | {:error, :not_found} | {:error, :network_issue}
def get_last_address_locally(address) when is_binary(address) do
genesis_address =
case get_genesis_transaction_address(address) do
{:ok, genesis_address} -> genesis_address
_ -> address
end

case get_last_address(genesis_address) do
^genesis_address -> {:error, :not_found}
last_address -> {:ok, last_address}
end
end

@doc """
Retrieve the transaction genesis address for a chain from the closest nodes
"""
@spec get_genesis_transaction_address(address :: binary()) ::
{:ok, binary()}
| {:error, :network_issue}
def get_genesis_transaction_address(address) when is_binary(address) do
resolve_genesis_address(address)
end

@spec fetch_chain_locally(address :: binary()) :: {:ok, [Transaction.t()]} | {:error, :db_error}
def fetch_chain_locally(address) when is_binary(address) do
try do
{:ok, fetch_chain_db(get(address), [])}
catch
_ ->
Logger.debug("Error fetching chain locally {:error, :db_error}")
{:error, :db_error}
end
end

def fetch_chain_db({chain, false, _}, acc), do: Enum.reverse(acc ++ chain)

def fetch_chain_db({chain, true, paging_address}, acc) do
fetch_chain_db(get(paging_address, [], paging_address: paging_address), acc ++ chain)
end

@spec get_from_local(binary) :: {boolean(), binary | nil}
def get_from_local(address) when is_binary(address) do
case get_last_address_locally(address) do
{:ok, last_address} -> {true, last_address}
_ -> {false, nil}
end
end

@doc """
Retrieve the genesis address of a chain
"""
@spec resolve_genesis_address(binary()) :: {:ok, binary()} | {:error, :network_issue}
def resolve_genesis_address(address) when is_binary(address) do
nodes =
address
|> Election.chain_storage_nodes(P2P.available_nodes())
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)

case fetch_genesis_address_remotely(address, nodes) do
{:ok, genesis_address} ->
{:ok, genesis_address}

_e ->
{:error, :network_issue}
end
end

@doc """
Fetch the genesis|first address remotely
"""
@spec fetch_genesis_address_remotely(binary(), list(Node.t())) ::
{:ok, binary()} | {:error, :network_issue}
def fetch_genesis_address_remotely(address, nodes) when is_binary(address) do
case P2P.quorum_read(nodes, %GetFirstAddress{address: address}) do
{:ok, %FirstAddress{address: genesis_address}} ->
{:ok, genesis_address}

_ ->
{:error, :network_issue}
end
end
end
96 changes: 93 additions & 3 deletions test/archethic_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ defmodule ArchethicTest do
alias Archethic.P2P.Message.GetTransactionChain
alias Archethic.P2P.Message.GetTransactionChainLength
alias Archethic.P2P.Message.GetTransactionInputs
alias Archethic.P2P.Message.GetFirstAddress

alias Archethic.P2P.Message.FirstAddress
alias Archethic.P2P.Message.LastTransactionAddress
alias Archethic.P2P.Message.NotFound
alias Archethic.P2P.Message.Ok
Expand Down Expand Up @@ -295,23 +298,110 @@ defmodule ArchethicTest do
authorization_date: DateTime.utc_now()
})

MockClient
|> expect(:send_message, fn _node, %GetFirstAddress{}, _timeout ->
{:ok, %FirstAddress{address: "@Alice0"}}
end)

MockDB
|> stub(:get_last_chain_address, fn address ->
address
end)

MockClient
|> stub(:send_message, fn
_, %GetTransactionChain{}, _ ->
{:ok,
%TransactionList{
transactions: [
%Transaction{address: "@Alice2"},
%Transaction{address: "@Alice1"}
%Transaction{address: "@Alice1"},
%Transaction{address: "@Alice0"}
]
}}

_, %GetTransactionChainLength{}, _ ->
%TransactionChainLength{length: 1}
end)

assert {:ok,
[
%Transaction{address: "@Alice2"},
%Transaction{address: "@Alice1"},
%Transaction{address: "@Alice0"}
]} = Archethic.get_transaction_chain("@Alice2")
end

test "should get_transaction_chain from local db and remaining from network " do
P2P.add_and_connect_node(%Node{
ip: {127, 0, 0, 1},
port: 3000,
first_public_key: Crypto.last_node_public_key(),
last_public_key: Crypto.last_node_public_key(),
network_patch: "AAA",
geo_patch: "AAA"
})

P2P.add_and_connect_node(%Node{
ip: {127, 0, 0, 1},
port: 3000,
first_public_key: "key1",
last_public_key: "key1",
network_patch: "AAA",
geo_patch: "AAA",
available?: true,
authorized?: true,
authorization_date: DateTime.utc_now()
})

MockClient
|> expect(:send_message, fn _node, %GetFirstAddress{}, _timeout ->
{:ok, %FirstAddress{address: "@Alice0"}}
end)

MockDB
|> stub(:get_last_chain_address, fn address ->
"@Alice5"
end)

MockDB
|> stub(:get_transaction_chain, fn address, [], [] ->
{[
%Transaction{address: "@Alice0"},
%Transaction{address: "@Alice1"},
%Transaction{address: "@Alice2"},
%Transaction{address: "@Alice3"},
%Transaction{address: "@Alice4"},
%Transaction{address: "@Alice5"}
], false, nil}
end)

MockClient
|> stub(:send_message, fn
_, %GetTransactionChain{address: "@Alice2", paging_state: "@Alice5"}, _ ->
{:ok,
%TransactionList{
transactions: [
%Transaction{address: "@Alice6"},
%Transaction{address: "@Alice7"}
]
}}

_, %GetTransactionChainLength{}, _ ->
%TransactionChainLength{length: 1}
end)

assert {:ok, [%Transaction{address: "@Alice2"}, %Transaction{address: "@Alice1"}]} =
Archethic.get_transaction_chain("@Alice2")
assert {:ok,
[
%Transaction{address: "@Alice5"},
%Transaction{address: "@Alice4"},
%Transaction{address: "@Alice3"},
%Transaction{address: "@Alice2"},
%Transaction{address: "@Alice1"},
%Transaction{address: "@Alice0"},
%Transaction{address: "@Alice6"},
%Transaction{address: "@Alice7"}
]} = Archethic.get_transaction_chain("@Alice2")
end
end

Expand Down

0 comments on commit 9689fc5

Please sign in to comment.