Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve latency handling #494

Merged
6 commits merged into from
Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions lib/archethic/account/mem_tables/token_ledger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,6 @@ defmodule Archethic.Account.MemTables.TokenLedger do
Initialize the Token ledger tables:
- Main Token ledger as ETS set ({token, to, from, token_id}, amount, spent?)
- Token Unspent Output Index as ETS bag (to, {from, token, token_id})

## Examples

iex> {:ok, _} = TokenLedger.start_link()
iex> { :ets.info(:archethic_token_ledger)[:type], :ets.info(:archethic_token_unspent_output_index)[:type] }
{ :set, :bag }
"""
def start_link(args \\ []) do
GenServer.start_link(__MODULE__, args)
Expand Down
5 changes: 0 additions & 5 deletions lib/archethic/account/mem_tables/uco_ledger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@ defmodule Archethic.Account.MemTables.UCOLedger do
- Main UCO ledger as ETS set ({to, from}, amount, spent?)
- UCO Unspent Output Index as ETS bag (to, from)

## Examples

iex> {:ok, _} = UCOLedger.start_link()
iex> { :ets.info(:archethic_uco_ledger)[:type], :ets.info(:archethic_uco_unspent_output_index)[:type] }
{ :set, :bag }
"""
def start_link(args \\ []) do
GenServer.start_link(__MODULE__, args)
Expand Down
34 changes: 24 additions & 10 deletions lib/archethic/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,13 @@ defmodule Archethic.BeaconChain do
@doc """
Return a list of beacon summaries from a list of transaction addresses
"""
@spec get_beacon_summaries(list(binary)) :: Enumerable.t() | list(Summary.t())
@spec get_beacon_summaries(list(binary)) :: list(Summary.t())
def get_beacon_summaries(addresses) do
addresses
|> Stream.map(&get_summary/1)
|> Stream.reject(&match?({:error, :not_found}, &1))
|> Stream.map(fn {:ok, summary} -> summary end)
|> Enum.to_list()
end

@doc """
Expand Down Expand Up @@ -342,15 +343,28 @@ defmodule Archethic.BeaconChain do
defp fetch_summaries(node, addresses) do
addresses
|> Stream.chunk_every(10)
|> Stream.flat_map(fn addresses ->
case P2P.send_message(node, %GetBeaconSummaries{addresses: addresses}) do
{:ok, %BeaconSummaryList{summaries: summaries}} ->
summaries

_ ->
[]
end
end)
|> Stream.flat_map(&batch_summaries_fetching(&1, node))
|> Enum.to_list()
end

defp batch_summaries_fetching(addresses, node) do
%{summaries: local_summaries, remaining: remaining} =
Enum.reduce(addresses, %{summaries: [], remaining: []}, fn addr, acc ->
case get_summary(addr) do
{:ok, summary} ->
Map.update!(acc, :summaries, &[summary | &1])

_ ->
Map.update!(acc, :remaining, &[addr | &1])
end
end)

case P2P.send_message(node, %GetBeaconSummaries{addresses: remaining}) do
{:ok, %BeaconSummaryList{summaries: fetched_summaries}} ->
local_summaries ++ fetched_summaries

_ ->
local_summaries
end
end
end
4 changes: 2 additions & 2 deletions lib/archethic/beacon_chain/slot/end_of_node_sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ defmodule Archethic.BeaconChain.Slot.EndOfNodeSync do
}
end

@spec from_map(map()) :: t()
def from_map(%{public_key: public_key, timestamp: timestamp}) do
@spec cast(map()) :: t()
def cast(%{public_key: public_key, timestamp: timestamp}) do
%__MODULE__{
public_key: public_key,
timestamp: timestamp
Expand Down
4 changes: 2 additions & 2 deletions lib/archethic/db/embedded_impl/chain_reader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do
Encoding.decode(version, column, data, acc)
end)
|> Utils.atomize_keys()
|> Transaction.from_map()
|> Transaction.cast()

:file.close(fd)

Expand Down Expand Up @@ -153,7 +153,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do
Encoding.decode(version, column, data, acc)
end)
|> Utils.atomize_keys()
|> Transaction.from_map()
|> Transaction.cast()

if tx.address == limit_address do
{Enum.reverse([tx | acc]), false, nil}
Expand Down
25 changes: 25 additions & 0 deletions lib/archethic/mining.ex
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,31 @@ defmodule Archethic.Mining do
|> DistributedWorkflow.add_cross_validation_stamp(stamp)
end

@doc """
Confirm the replication from a storage node
"""
@spec confirm_replication(
address :: binary(),
signature :: binary(),
node_public_key :: Crypto.key()
) ::
:ok
def confirm_replication(tx_address, signature, node_public_key) do
pid = get_mining_process!(tx_address)
send(pid, {:ack_replication, signature, node_public_key})
:ok
end

@doc """
Notify replication to the mining process
"""
@spec notify_replication_error(binary(), any()) :: :ok
def notify_replication_error(tx_address, error_reason) do
pid = get_mining_process!(tx_address)
send(pid, {:replication_error, error_reason})
:ok
end

defp get_mining_process!(tx_address, timeout \\ @mining_timeout) do
retry_while with: exponential_backoff(100, 2) |> expiry(timeout) do
case Registry.lookup(WorkflowRegistry, tx_address) do
Expand Down
70 changes: 13 additions & 57 deletions lib/archethic/mining/distributed_workflow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,16 @@ defmodule Archethic.Mining.DistributedWorkflow do

alias Archethic.P2P
alias Archethic.P2P.Message
alias Archethic.P2P.Message.AcknowledgeStorage
alias Archethic.P2P.Message.AddMiningContext
alias Archethic.P2P.Message.CrossValidate
alias Archethic.P2P.Message.CrossValidationDone
alias Archethic.P2P.Message.Error
alias Archethic.P2P.Message.ValidationError
alias Archethic.P2P.Message.ReplicateTransactionChain
alias Archethic.P2P.Message.ReplicateTransaction
alias Archethic.P2P.Message.ValidationError
alias Archethic.P2P.Node

alias Archethic.Replication

alias Archethic.TaskSupervisor

alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations
Expand Down Expand Up @@ -673,7 +669,7 @@ defmodule Archethic.Mining.DistributedWorkflow do

def handle_event(
:info,
{:add_ack_storage, node_public_key, signature},
{:ack_replication, signature, node_public_key},
:replication,
data = %{start_time: start_time, context: context = %ValidationContext{transaction: tx}}
) do
Expand Down Expand Up @@ -739,14 +735,16 @@ defmodule Archethic.Mining.DistributedWorkflow do
beacon_storage_nodes = ValidationContext.get_beacon_replication_nodes(context)
P2P.broadcast_message(P2P.distinct_nodes([welcome_node | beacon_storage_nodes]), message)

validated_tx = ValidationContext.get_validated_transaction(context)

context
|> ValidationContext.get_io_replication_nodes()
|> P2P.broadcast_message(
with tx <- ValidationContext.get_validated_transaction(context) do
if Transaction.network_type?(tx.type),
do: %ReplicateTransactionChain{transaction: tx},
else: %ReplicateTransaction{transaction: tx}
end
if Transaction.network_type?(validated_tx.type),
do: %ReplicateTransactionChain{
transaction: validated_tx
},
else: %ReplicateTransaction{transaction: validated_tx}
)

:keep_state_and_data
Expand Down Expand Up @@ -986,8 +984,7 @@ defmodule Archethic.Mining.DistributedWorkflow do

defp request_replication(
context = %ValidationContext{
transaction: tx,
previous_transaction: previous_transaction
transaction: tx
}
) do
storage_nodes = ValidationContext.get_chain_replication_nodes(context)
Expand All @@ -998,54 +995,13 @@ defmodule Archethic.Mining.DistributedWorkflow do
transaction_type: tx.type
)

# Get transaction chain size to calculate timeout
chain_size =
case previous_transaction do
nil ->
1

previous_transaction ->
# Get transaction chain size to calculate timeout
case Archethic.get_transaction_chain_length(previous_transaction.address) do
{:ok, chain_size} ->
chain_size

_ ->
1
end
end

timeout = Message.get_max_timeout() + Message.get_max_timeout() * chain_size

validated_tx = ValidationContext.get_validated_transaction(context)

message = %ReplicateTransactionChain{
transaction: validated_tx
transaction: validated_tx,
replying_node: Crypto.first_node_public_key()
}

me = self()

Task.Supervisor.async_stream_nolink(
TaskSupervisor,
storage_nodes,
fn node ->
{P2P.send_message(node, message, timeout), node}
end,
ordered: false,
on_timeout: :kill_task,
timeout: timeout + 2000
)
|> Stream.filter(&match?({:ok, {{:ok, _}, _}}, &1))
|> Stream.map(fn {:ok, {{:ok, response}, node}} -> {response, node} end)
|> Stream.each(fn
{%Error{reason: reason}, _node} ->
send(me, {:replication_error, reason})

{%AcknowledgeStorage{
signature: signature
}, %Node{first_public_key: node_public_key}} ->
send(me, {:add_ack_storage, node_public_key, signature})
end)
|> Stream.run()
P2P.broadcast_message(storage_nodes, message)
end
end
Loading