Skip to content

Commit

Permalink
Improve P2P (#441)
Browse files Browse the repository at this point in the history
* Timeout is returned by state managment

* Set dynamic timeout for single tx message with full data

* Set dynamic timeout for distibuted workflow

* Calculate timeout for GetTxChain message

* Update quorum to read first request concurently
  • Loading branch information
Neylix committed Jul 12, 2022
1 parent 111c702 commit e3bcecc
Show file tree
Hide file tree
Showing 20 changed files with 271 additions and 100 deletions.
5 changes: 4 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ config :archethic, Archethic.P2P.Listener,
transport: :tcp,
port: 3002

# Floor upload speed in bytes/sec (1Mb/sec -> 0.125MB/s)
config :archethic, Archethic.P2P.Message, floor_upload_speed: 125_000

config :archethic, Archethic.SelfRepair.Sync, last_sync_file: "p2p/last_sync"

# Configure the endpoint
Expand All @@ -117,7 +120,7 @@ config :archethic, ArchethicWeb.Endpoint,

config :archethic, Archethic.Mining.DistributedWorkflow,
global_timeout: 60_000,
coordinator_notification_timeout: 5_000,
coordinator_timeout_supplement: 2_000,
context_notification_timeout: 3_000

config :archethic, Archethic.OracleChain,
Expand Down
56 changes: 44 additions & 12 deletions lib/archethic/mining/distributed_workflow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ defmodule Archethic.Mining.DistributedWorkflow do
alias Archethic.Mining.WorkflowRegistry

alias Archethic.P2P
alias Archethic.P2P.Message
alias Archethic.P2P.Message.AcknowledgeStorage
alias Archethic.P2P.Message.AddMiningContext
alias Archethic.P2P.Message.CrossValidate
Expand All @@ -50,10 +51,10 @@ defmodule Archethic.Mining.DistributedWorkflow do
use GenStateMachine, callback_mode: [:handle_event_function, :state_enter], restart: :temporary

@mining_timeout Application.compile_env!(:archethic, [__MODULE__, :global_timeout])
@coordinator_notification_timeout Application.compile_env!(:archethic, [
__MODULE__,
:coordinator_notification_timeout
])
@coordinator_timeout_supplement Application.compile_env!(:archethic, [
__MODULE__,
:coordinator_timeout_supplement
])
@context_notification_timeout Application.compile_env!(:archethic, [
__MODULE__,
:context_notification_timeout
Expand Down Expand Up @@ -126,6 +127,16 @@ defmodule Archethic.Mining.DistributedWorkflow do
GenStateMachine.cast(pid, {:add_cross_validation_stamp, stamp})
end

defp get_context_timeout(:hosting), do: Message.get_max_timeout()
defp get_context_timeout(:oracle), do: @context_notification_timeout + 1_000
defp get_context_timeout(_type), do: @context_notification_timeout

defp get_coordinator_timeout(type),
do: get_context_timeout(type) + @coordinator_timeout_supplement

defp get_mining_timeout(type) when type == :hosting, do: @mining_timeout * 3
defp get_mining_timeout(_type), do: @mining_timeout

def init(opts) do
{tx, welcome_node, validation_nodes, node_public_key, timeout} = parse_opts(opts)

Expand Down Expand Up @@ -200,7 +211,7 @@ defmodule Archethic.Mining.DistributedWorkflow do
welcome_node = Keyword.get(opts, :welcome_node)
validation_nodes = Keyword.get(opts, :validation_nodes)
node_public_key = Keyword.get(opts, :node_public_key)
timeout = Keyword.get(opts, :timeout, @mining_timeout)
timeout = Keyword.get(opts, :timeout, get_mining_timeout(tx.type))

{tx, welcome_node, validation_nodes, node_public_key, timeout}
end
Expand Down Expand Up @@ -342,7 +353,7 @@ defmodule Archethic.Mining.DistributedWorkflow do
# TODO: Provide a better waiting time management
# for example rolling percentile latency could be way to achieve this
# (https://cs.stackexchange.com/a/129178)
waiting_time = @context_notification_timeout
waiting_time = get_context_timeout(tx.type)

Logger.debug(
"Coordinator will wait #{waiting_time} ms before continue with the responding nodes",
Expand All @@ -357,7 +368,7 @@ defmodule Archethic.Mining.DistributedWorkflow do

:cross_validator ->
[
{{:timeout, :change_coordinator}, @coordinator_notification_timeout, :any},
{{:timeout, :change_coordinator}, get_coordinator_timeout(tx.type), :any},
{{:timeout, :stop_timeout}, timeout, :any}
]
end
Expand Down Expand Up @@ -415,7 +426,7 @@ defmodule Archethic.Mining.DistributedWorkflow do
)

actions = [
{{:timeout, :wait_confirmations}, @context_notification_timeout, :any}
{{:timeout, :wait_confirmations}, get_context_timeout(tx.type), :any}
]

{:keep_state_and_data, actions}
Expand Down Expand Up @@ -800,7 +811,7 @@ defmodule Archethic.Mining.DistributedWorkflow do
{:next_state, :coordinator, %{data | context: new_context}}
else
actions = [
{{:timeout, :change_coordinator}, @coordinator_notification_timeout, :any},
{{:timeout, :change_coordinator}, get_coordinator_timeout(tx.type), :any},
{:next_event, :internal, :notify_context}
]

Expand Down Expand Up @@ -920,7 +931,8 @@ defmodule Archethic.Mining.DistributedWorkflow do

defp request_replication(
context = %ValidationContext{
transaction: tx
transaction: tx,
previous_transaction: previous_transaction
}
) do
storage_nodes = ValidationContext.get_chain_replication_nodes(context)
Expand All @@ -931,6 +943,25 @@ defmodule Archethic.Mining.DistributedWorkflow do
transaction_type: tx.type
)

# Get transaction chain size to calculate timeout
chain_size =
case previous_transaction do
nil ->
1

previous_transaction ->
# Get transaction chain size to calculate timeout
case Archethic.get_transaction_chain_length(previous_transaction.address) do
{:ok, chain_size} ->
chain_size

_ ->
1
end
end

timeout = Message.get_max_timeout() + Message.get_max_timeout() * chain_size

validated_tx = ValidationContext.get_validated_transaction(context)

message = %ReplicateTransactionChain{
Expand All @@ -943,10 +974,11 @@ defmodule Archethic.Mining.DistributedWorkflow do
TaskSupervisor,
storage_nodes,
fn node ->
{P2P.send_message(node, message), node}
{P2P.send_message(node, message, timeout), node}
end,
ordered: false,
on_timeout: :kill_task
on_timeout: :kill_task,
timeout: timeout
)
|> Stream.filter(&match?({:ok, {{:ok, _}, _}}, &1))
|> Stream.map(fn {:ok, {{:ok, response}, node}} -> {response, node} end)
Expand Down
6 changes: 3 additions & 3 deletions lib/archethic/mining/transaction_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule Archethic.Mining.TransactionContext do
alias Archethic.Election

alias Archethic.P2P
alias Archethic.P2P.Message
alias Archethic.P2P.Message.Ok
alias Archethic.P2P.Message.Ping
alias Archethic.P2P.Node
Expand Down Expand Up @@ -61,7 +62,7 @@ defmodule Archethic.Mining.TransactionContext do
utxo_task = request_utxo(previous_address, unspent_outputs_nodes_split)
nodes_view_task = request_nodes_view(node_public_keys)

prev_tx = Task.await(prev_tx_task)
prev_tx = Task.await(prev_tx_task, Message.get_max_timeout())
utxos = Task.await(utxo_task)
nodes_view = Task.await(nodes_view_task)

Expand Down Expand Up @@ -130,8 +131,7 @@ defmodule Archethic.Mining.TransactionContext do
fn node_public_key ->
{node_public_key, P2P.send_message(node_public_key, %Ping{}, 500)}
end,
on_timeout: :kill_task,
timeout: 500
on_timeout: :kill_task
)
|> Stream.filter(&match?({:ok, _}, &1))
|> Enum.map(fn
Expand Down
113 changes: 66 additions & 47 deletions lib/archethic/p2p.ex
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ defmodule Archethic.P2P do
For mode details see `send_message/3`
"""
@spec send_message!(Crypto.key() | Node.t(), Message.request(), timeout()) :: Message.response()
def send_message!(node, message, timeout \\ 3_000)
def send_message!(node, message, timeout \\ 0)

def send_message!(public_key, message, timeout) when is_binary(public_key) do
public_key
Expand Down Expand Up @@ -249,7 +249,7 @@ defmodule Archethic.P2P do
| {:error, :not_found}
| {:error, :timeout}
| {:error, :closed}
def send_message(node, message, timeout \\ 3_000)
def send_message(node, message, timeout \\ 0)

def send_message(public_key, message, timeout) when is_binary(public_key) do
case get_node_info(public_key) do
Expand All @@ -261,7 +261,12 @@ defmodule Archethic.P2P do
end
end

defdelegate send_message(node, message, timeout), to: Client
def send_message(node, message, timeout) do
timeout = if timeout == 0, do: Message.get_timeout(message), else: timeout
do_send_message(node, message, timeout)
end

defdelegate do_send_message(node, message, timeout), to: Client, as: :send_message

@doc """
Get the nearest nodes from a specified node and a list of nodes to compare with
Expand Down Expand Up @@ -471,7 +476,8 @@ defmodule Archethic.P2P do
def broadcast_message(nodes, message) do
Task.Supervisor.async_stream_nolink(TaskSupervisor, nodes, &send_message(&1, message),
ordered: false,
on_timeout: :kill_task
on_timeout: :kill_task,
timeout: Message.get_timeout(message)
)
|> Stream.run()
end
Expand Down Expand Up @@ -557,75 +563,88 @@ defmodule Archethic.P2P do
node_list :: list(Node.t()),
message :: Message.t(),
conflict_resolver :: (list(Message.t()) -> Message.t()),
timeout :: non_neg_integer(),
consistency_level :: pos_integer()
) ::
{:ok, Message.t()} | {:error, :network_issue}
def quorum_read(
nodes,
message,
conflict_resolver \\ fn results -> List.first(results) end,
consistency_level \\ 2
timeout \\ 0,
consistency_level \\ 3
)

def quorum_read([], _, _, _), do: {:error, :network_issue}

def quorum_read(
[node | rest],
message,
conflict_resolver,
consistency_level
) do
# We request the first node and
case send_message(node, message) do
{:ok, result} ->
# Then we try to reach performing monotonic quorum read (using the conflict resolver)
do_quorum_read(
rest,
message,
conflict_resolver,
consistency_level,
result
)

{:error, _} ->
quorum_read(rest, message, conflict_resolver, consistency_level)
end
def quorum_read(nodes, message, conflict_resolver, timeout, consistency_level) do
do_quorum_read(nodes, message, conflict_resolver, timeout, consistency_level, nil)
end

defp do_quorum_read([], _message, _conflict_resolver, _consistency_level, prior_result),
do: {:ok, prior_result}

defp do_quorum_read(nodes, message, conflict_resolver, consistency_level, prior_result) do
defp do_quorum_read([], _, _, _, _, nil), do: {:error, :network_issue}
defp do_quorum_read([], _, _, _, _, previous_result), do: {:ok, previous_result}

defp do_quorum_read(
nodes,
message,
conflict_resolver,
timeout,
consistency_level,
previous_result
) do
# We determine how many nodes to fetch for the quorum from the consistency level
{group, rest} = Enum.split(nodes, consistency_level)

timeout = if timeout == 0, do: Message.get_timeout(message), else: timeout

results =
Task.Supervisor.async_stream_nolink(
TaskSupervisor,
group,
&send_message(&1, message),
&send_message(&1, message, timeout),
ordered: false,
on_timeout: :kill_task
on_timeout: :kill_task,
timeout: timeout
)
|> Stream.filter(&match?({:ok, {:ok, _}}, &1))
|> Stream.map(fn {:ok, {:ok, res}} -> res end)
|> Enum.to_list()

# If no nodes answered we try another group
if Enum.empty?(results) do
do_quorum_read(rest, message, conflict_resolver, consistency_level, prior_result)
else
distinct_elems = Enum.dedup([prior_result | results])
case length(results) do
0 ->
do_quorum_read(
rest,
message,
conflict_resolver,
consistency_level,
timeout,
previous_result
)

# If the results are the same, then we reached consistency
if length(distinct_elems) == 1 do
{:ok, prior_result}
else
# If the results differ, we can apply a conflict resolver to merge the result into
# a consistent response
resolved_result = conflict_resolver.(distinct_elems)
{:ok, resolved_result}
end
1 ->
if previous_result != nil do
do_quorum([previous_result | results], conflict_resolver)
else
result = List.first(results)
do_quorum_read(rest, message, conflict_resolver, consistency_level - 1, timeout, result)
end

_ ->
results = if previous_result != nil, do: [previous_result | results], else: results
do_quorum(results, conflict_resolver)
end
end

defp do_quorum(results, conflict_resolver) do
distinct_elems = Enum.dedup(results)

# If the results are the same, then we reached consistency
if length(distinct_elems) == 1 do
{:ok, List.first(distinct_elems)}
else
# If the results differ, we can apply a conflict resolver to merge the result into
# a consistent response
resolved_result = conflict_resolver.(distinct_elems)
{:ok, resolved_result}
end
end
end
7 changes: 3 additions & 4 deletions lib/archethic/p2p/client/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ defmodule Archethic.P2P.Client.Connection do
receive do
{^ref, msg} ->
msg
after
timeout ->
{:error, :timeout}
end
end

Expand Down Expand Up @@ -242,12 +239,14 @@ defmodule Archethic.P2P.Client.Connection do
data = %{node_public_key: node_public_key}
) do
case pop_in(data, [:messages, msg_id]) do
{%{message_name: message_name}, new_data} ->
{%{from: from, ref: ref, message_name: message_name}, new_data} ->
Logger.debug("Message #{message_name} reaches its timeout",
node: Base.encode16(node_public_key),
message_id: msg_id
)

send(from, {ref, {:error, :timeout}})

{:keep_state, new_data}

{nil, _} ->
Expand Down
Loading

0 comments on commit e3bcecc

Please sign in to comment.