Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replicate only missing transactions #443 #453

Merged
merged 3 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 41 additions & 14 deletions lib/archethic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ defmodule Archethic do

alias __MODULE__.P2P

alias __MODULE__.DB

alias __MODULE__.P2P.Message.Balance
alias __MODULE__.P2P.Message.GetBalance
alias __MODULE__.P2P.Message.NewTransaction
Expand Down Expand Up @@ -177,7 +179,7 @@ defmodule Archethic do
end

@doc """
Retrieve a transaction chain based on an address from the closest nodes
Retrieve a transaction chain based on an address from the closest nodes.
"""
@spec get_transaction_chain(binary()) :: {:ok, list(Transaction.t())} | {:error, :network_issue}
def get_transaction_chain(address) when is_binary(address) do
Expand All @@ -188,13 +190,23 @@ defmodule Archethic do
|> Enum.filter(&Node.locally_available?/1)

try do
chain =
address
|> TransactionChain.stream_remotely(nodes)
|> Enum.to_list()
|> List.flatten()

{:ok, chain}
{local_chain, paging_address} =
case TransactionChain.get_last_local_address(address) do
nil -> {[], nil}
last_address -> {TransactionChain.get_locally(last_address), last_address}
end

remote_chain =
if address != paging_address do
address
|> TransactionChain.stream_remotely(nodes, paging_address)
|> Enum.to_list()
|> List.flatten()
else
[]
end

{:ok, local_chain ++ remote_chain}
catch
_ ->
{:error, :network_issue}
Expand All @@ -215,12 +227,27 @@ defmodule Archethic do
|> Enum.filter(&Node.locally_available?/1)

try do
chain_page =
address
|> TransactionChain.stream_remotely(nodes, paging_address)
|> Enum.at(0)

{:ok, chain_page}
{local_chain, paging_address} =
with true <- DB.transaction_exists?(paging_address),
last_address when last_address != nil <-
TransactionChain.get_last_local_address(address),
true <- last_address != paging_address do
{TransactionChain.get_locally(last_address, paging_address), last_address}
else
_ -> {[], paging_address}
end

remote_chain =
if paging_address != address do
address
|> TransactionChain.stream_remotely(nodes, paging_address)
|> Enum.to_list()
|> List.flatten()
else
[]
end

{:ok, local_chain ++ remote_chain}
catch
_ ->
{:error, :network_issue}
Expand Down
9 changes: 2 additions & 7 deletions lib/archethic/p2p/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1256,13 +1256,8 @@ defmodule Archethic.P2P.Message do
end

def process(%GetFirstAddress{address: address}) do
case TransactionChain.get_first_transaction(address, [:address]) do
{:ok, %Transaction{address: address}} ->
%FirstAddress{address: address}

{:error, :transaction_not_exists} ->
%NotFound{}
end
genesis_address = TransactionChain.get_genesis_address(address)
%FirstAddress{address: genesis_address}
end

def process(%GetLastTransactionAddress{address: address, timestamp: timestamp}) do
Expand Down
8 changes: 7 additions & 1 deletion lib/archethic/replication/transaction_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,13 @@ defmodule Archethic.Replication.TransactionContext do
[]

nodes ->
TransactionChain.stream_remotely(address, nodes)
paging_address = TransactionChain.get_last_local_address(address)

if paging_address != address do
TransactionChain.stream_remotely(address, nodes, paging_address)
else
[]
end
end
end

Expand Down
76 changes: 66 additions & 10 deletions lib/archethic/transaction_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,23 @@ defmodule Archethic.TransactionChain do
alias Archethic.P2P
alias Archethic.P2P.Message
alias Archethic.P2P.Message.Error
alias Archethic.P2P.Message.GetTransaction
alias Archethic.P2P.Message.GetTransactionChain
alias Archethic.P2P.Message.GetTransactionChainLength
alias Archethic.P2P.Message.GetLastTransactionAddress
alias Archethic.P2P.Message.GetTransactionInputs
alias Archethic.P2P.Message.GetUnspentOutputs
alias Archethic.P2P.Message.LastTransactionAddress

alias Archethic.P2P.Node
alias Archethic.P2P.Message.NotFound
alias Archethic.P2P.Message.TransactionChainLength
alias Archethic.P2P.Message.TransactionList
alias Archethic.P2P.Message.TransactionInputList
alias Archethic.P2P.Message.UnspentOutputList
alias Archethic.P2P.Node
alias Archethic.P2P.Message.TransactionInputList
alias Archethic.P2P.Message.TransactionChainLength
alias Archethic.P2P.Message.LastTransactionAddress
alias Archethic.P2P.Message.FirstAddress

alias Archethic.P2P.Message.GetTransaction
alias Archethic.P2P.Message.GetFirstAddress
alias Archethic.P2P.Message.GetUnspentOutputs
alias Archethic.P2P.Message.GetTransactionChain
alias Archethic.P2P.Message.GetTransactionInputs
alias Archethic.P2P.Message.GetLastTransactionAddress
alias Archethic.P2P.Message.GetTransactionChainLength

alias __MODULE__.MemTables.KOLedger
alias __MODULE__.MemTables.PendingLedger
Expand Down Expand Up @@ -246,6 +250,12 @@ defmodule Archethic.TransactionChain do
|> get_transaction(fields)
end

@doc """
Get the genesis address from a given chain address
"""
@spec get_genesis_address(binary()) :: binary()
defdelegate get_genesis_address(address), to: DB, as: :get_first_chain_address

@doc """
Produce a proof of integrity for a given chain.

Expand Down Expand Up @@ -805,4 +815,50 @@ defmodule Archethic.TransactionChain do
{:error, :network_issue}
end
end

@doc """
Retrieve the last transaction address for a chain stored locally
It queries the the network for genesis address
"""
@spec get_last_local_address(address :: binary()) :: binary() | nil
def get_last_local_address(address) when is_binary(address) do
case fetch_genesis_address_remotely(address) do
{:ok, genesis_address} ->
case get_last_address(genesis_address) do
^genesis_address -> nil
last_address -> last_address
end

_ ->
nil
end
end

defp fetch_genesis_address_remotely(address) when is_binary(address) do
nodes =
address
|> Election.chain_storage_nodes(P2P.available_nodes())
|> P2P.nearest_nodes()
|> Enum.filter(&Node.locally_available?/1)

case P2P.quorum_read(nodes, %GetFirstAddress{address: address}) do
{:ok, %FirstAddress{address: genesis_address}} ->
{:ok, genesis_address}

_ ->
{:error, :network_issue}
end
end

@spec get_locally(address :: binary(), paging_address :: binary() | nil) ::
Enumerable.t() | list(Transaction.t())
def get_locally(address, paging_address \\ nil) when is_binary(address) do
fetch_chain_db(get(address, [], paging_state: paging_address), [])
end

def fetch_chain_db({chain, false, _}, acc), do: acc ++ chain

def fetch_chain_db({chain, true, paging_address}, acc) do
fetch_chain_db(get(paging_address, [], paging_state: paging_address), acc ++ chain)
end
end
20 changes: 15 additions & 5 deletions test/archethic/beacon_chain/subset_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ defmodule Archethic.BeaconChain.SubsetTest do
alias Archethic.P2P.Message.Ok
alias Archethic.P2P.Message.Ping
alias Archethic.P2P.Node
# alias Archethic.P2P.Message.GetFirstAddress
# alias Archethic.P2P.Message.NotFound

alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.TransactionData
Expand Down Expand Up @@ -288,6 +290,9 @@ defmodule Archethic.BeaconChain.SubsetTest do
_, %Ping{}, _ ->
Process.sleep(10)
{:ok, %Ok{}}

_, %NewBeaconTransaction{}, _ ->
{:ok, %Ok{}}
end)

MockDB
Expand Down Expand Up @@ -405,11 +410,16 @@ defmodule Archethic.BeaconChain.SubsetTest do
)

MockClient
|> expect(:send_message, fn _,
%BeaconUpdate{transaction_attestations: transaction_attestations},
_ ->
send(me, {:transaction_attestations, transaction_attestations})
{:ok, %Ok{}}
|> expect(:send_message, fn
_, %BeaconUpdate{transaction_attestations: transaction_attestations}, _ ->
send(me, {:transaction_attestations, transaction_attestations})
{:ok, %Ok{}}

_, %ReplicationAttestation{}, _ ->
{:ok, %Ok{}}

_, %NewBeaconTransaction{}, _ ->
{:ok, %Ok{}}
end)

Subset.subscribe_for_beacon_updates(subset, first_public_key)
Expand Down
17 changes: 17 additions & 0 deletions test/archethic/bootstrap/network_init_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ defmodule Archethic.Bootstrap.NetworkInitTest do
alias Archethic.P2P.Message.TransactionList
alias Archethic.P2P.Message.TransactionInputList
alias Archethic.P2P.Node
alias Archethic.P2P.Message.GetFirstAddress
alias Archethic.P2P.Message.NotFound

alias Archethic.SharedSecrets
alias Archethic.SharedSecrets.MemTables.NetworkLookup
Expand Down Expand Up @@ -153,6 +155,9 @@ defmodule Archethic.Bootstrap.NetworkInitTest do

_, %GetTransactionChainLength{}, _ ->
%TransactionChainLength{length: 1}

_, %GetFirstAddress{}, _ ->
{:ok, %NotFound{}}
end)

Crypto.generate_deterministic_keypair("daily_nonce_seed")
Expand Down Expand Up @@ -209,6 +214,9 @@ defmodule Archethic.Bootstrap.NetworkInitTest do

_, %GetTransactionChainLength{}, _ ->
%TransactionChainLength{length: 1}

_, %GetFirstAddress{}, _ ->
{:ok, %NotFound{}}
end)

me = self()
Expand Down Expand Up @@ -263,6 +271,9 @@ defmodule Archethic.Bootstrap.NetworkInitTest do

_, %GetTransactionChainLength{}, _ ->
%TransactionChainLength{length: 1}

_, %GetFirstAddress{}, _ ->
{:ok, %NotFound{}}
end)

P2P.add_and_connect_node(%Node{
Expand Down Expand Up @@ -308,6 +319,9 @@ defmodule Archethic.Bootstrap.NetworkInitTest do

_, %GetTransactionChainLength{}, _ ->
%TransactionChainLength{length: 1}

_, %GetFirstAddress{}, _ ->
{:ok, %NotFound{}}
end)

network_pool_seed = :crypto.strong_rand_bytes(32)
Expand Down Expand Up @@ -352,6 +366,9 @@ defmodule Archethic.Bootstrap.NetworkInitTest do

_, %GetTransactionChainLength{}, _ ->
%TransactionChainLength{length: 1}

_, %GetFirstAddress{}, _ ->
{:ok, %NotFound{}}
end)

me = self()
Expand Down
5 changes: 5 additions & 0 deletions test/archethic/bootstrap_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ defmodule Archethic.BootstrapTest do
alias Archethic.P2P.Message.TransactionInputList
alias Archethic.P2P.Message.Ok
alias Archethic.P2P.Node
alias Archethic.P2P.Message.GetFirstAddress
alias Archethic.P2P.Message.NotFound

alias Archethic.Replication

Expand Down Expand Up @@ -80,6 +82,9 @@ defmodule Archethic.BootstrapTest do

_, %GetTransactionChainLength{}, _ ->
%TransactionChainLength{length: 1}

_, %GetFirstAddress{}, _ ->
{:ok, %NotFound{}}
end)

{:ok, daily_nonce_agent} = Agent.start_link(fn -> %{} end)
Expand Down
7 changes: 6 additions & 1 deletion test/archethic/replication/transaction_context_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ defmodule Archethic.Replication.TransactionContextTest do
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.UnspentOutput
alias Archethic.TransactionChain.TransactionInput

alias Archethic.P2P.Message.GetFirstAddress
alias Archethic.P2P.Message.NotFound
# alias Archethic.P2P.Message.FirstAddress
import Mox

test "fetch_transaction/1 should retrieve the transaction" do
Expand Down Expand Up @@ -52,6 +54,9 @@ defmodule Archethic.Replication.TransactionContextTest do

_, %GetTransactionChainLength{}, _ ->
%TransactionChainLength{length: 1}

_, %GetFirstAddress{}, _ ->
{:ok, %NotFound{}}
end)

P2P.add_and_connect_node(%Node{
Expand Down
5 changes: 5 additions & 0 deletions test/archethic/replication_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ defmodule Archethic.ReplicationTest do
alias Archethic.P2P.Message.TransactionInputList
alias Archethic.P2P.Message.TransactionList
alias Archethic.P2P.Node
alias Archethic.P2P.Message.GetFirstAddress
# alias Archethic.P2P.Message.FirstAddress

alias Archethic.Replication

Expand Down Expand Up @@ -99,6 +101,9 @@ defmodule Archethic.ReplicationTest do

_, %GetTransactionChainLength{}, _ ->
%TransactionChainLength{length: 1}

_, %GetFirstAddress{}, _ ->
{:ok, %NotFound{}}
end)

assert :ok = Replication.validate_and_store_transaction_chain(tx, [])
Expand Down
5 changes: 5 additions & 0 deletions test/archethic/self_repair/sync/transaction_handler_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ defmodule Archethic.SelfRepair.Sync.TransactionHandlerTest do
alias Archethic.P2P.Message.TransactionList
alias Archethic.P2P.Message.UnspentOutputList
alias Archethic.P2P.Node
alias Archethic.P2P.Message.GetFirstAddress
# alias Archethic.P2P.Message.FirstAddress

alias Archethic.SelfRepair.Sync.TransactionHandler
alias Archethic.SharedSecrets.MemTables.NetworkLookup
Expand Down Expand Up @@ -152,6 +154,9 @@ defmodule Archethic.SelfRepair.Sync.TransactionHandlerTest do

_, %GetTransactionChainLength{}, _ ->
%TransactionChainLength{length: 1}

_, %GetFirstAddress{}, _ ->
{:ok, %NotFound{}}
end)

MockDB
Expand Down
Loading